From d7a66453369bc4c05a0d561ac9f0f85246fe119a Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 4 Nov 2020 18:05:48 +1100
Subject: [PATCH v16] Support 2PC txn - pgoutput.

This patch adds support in the pgoutput plugin and subscriber for handling
of two-phase commits.

Includes pgoutput changes.

Includes subscriber changes.
---
 src/backend/access/transam/twophase.c       |  27 ++++
 src/backend/replication/logical/proto.c     | 141 ++++++++++++++++-
 src/backend/replication/logical/worker.c    | 227 ++++++++++++++++++++++++++++
 src/backend/replication/pgoutput/pgoutput.c |  74 +++++++++
 src/include/access/twophase.h               |   1 +
 src/include/replication/logicalproto.h      |  37 ++++-
 6 files changed, 505 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 7940060..129afe9 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -548,6 +548,33 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 }
 
 /*
+ * LookupGXact
+ *		Check if the prepared transaction with the given GID is	around
+ */
+bool
+LookupGXact(const char *gid)
+{
+	int			i;
+	bool		found = false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs */
+		if (gxact->valid && strcmp(gxact->gid, gid) == 0)
+		{
+			found = true;
+			break;
+		}
+
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	return found;
+}
+
+/*
  * LockGXact
  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
  */
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index fdb3118..26e43f7 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -78,7 +78,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
 
-	/* send the flags field (unused for now) */
+	/* send the flags field */
 	pq_sendbyte(out, flags);
 
 	/* send fields */
@@ -106,6 +106,145 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
 }
 
 /*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						 XLogRecPtr prepare_lsn)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+	/*
+	 * This should only ever happen for two-phase commit transactions. In which case we
+	 * expect to have a valid GID.
+	 */
+	Assert(rbtxn_prepared(txn));
+	Assert(txn->gid != NULL);
+
+	/*
+	 * Flags are determined from the state of the transaction. We know we
+	 * always get PREPARE first and then [COMMIT|ROLLBACK] PREPARED, so if
+	 * it's already marked as committed then it has to be COMMIT PREPARED (and
+	 * likewise for abort / ROLLBACK PREPARED).
+	 */
+	if (rbtxn_commit_prepared(txn))
+		flags = LOGICALREP_IS_COMMIT_PREPARED;
+	else if (rbtxn_rollback_prepared(txn))
+		flags = LOGICALREP_IS_ROLLBACK_PREPARED;
+	else
+		flags = LOGICALREP_IS_PREPARE;
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data)
+{
+	/* read flags */
+	uint8		flags = pq_getmsgbyte(in);
+
+	if (!PrepareFlagsAreValid(flags))
+		elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+	/* set the action (reuse the constants used for the flags) */
+	prepare_data->prepare_type = flags;
+
+	/* read fields */
+	prepare_data->prepare_lsn = pq_getmsgint64(in);
+	prepare_data->end_lsn = pq_getmsgint64(in);
+	prepare_data->preparetime = pq_getmsgint64(in);
+
+	/* read gid (copy it into a pre-allocated buffer) */
+	strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write STREAM PREPARE to the output stream.
+ */
+void
+logicalrep_write_stream_prepare(StringInfo out,
+								ReorderBufferTXN *txn,
+								XLogRecPtr prepare_lsn)
+{
+	uint8	flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_PREPARE);
+
+	/*
+	 * This should only ever happen for two-phase transactions. In which case we
+	 * expect to have a valid GID.
+	 */
+	Assert(rbtxn_prepared(txn));
+	Assert(txn->gid != NULL);
+
+	/*
+	 * For streaming APIs only PREPARE is supported. [COMMIT|ROLLBACK] PREPARED
+	 * uses non-streaming APIs
+	 */
+	flags = LOGICALREP_IS_PREPARE;
+
+	/* transaction ID */
+	Assert(TransactionIdIsValid(txn->xid));
+	pq_sendint32(out, txn->xid);
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read STREAM PREPARE from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_prepare(StringInfo in, LogicalRepPrepareData *prepare_data)
+{
+	TransactionId	xid;
+	uint8			flags;
+
+	xid = pq_getmsgint(in, 4);
+
+	/* read flags */
+	flags = pq_getmsgbyte(in);
+
+	if (flags != LOGICALREP_IS_PREPARE)
+		elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+	/* set the action (reuse the constants used for the flags) */
+	prepare_data->prepare_type = flags;
+
+	/* read fields */
+	prepare_data->prepare_lsn = pq_getmsgint64(in);
+	prepare_data->end_lsn = pq_getmsgint64(in);
+	prepare_data->preparetime = pq_getmsgint64(in);
+
+	/* read gid (copy it into a pre-allocated buffer) */
+	strcpy(prepare_data->gid, pq_getmsgstring(in));
+
+	return xid;
+}
+
+/*
  * Write ORIGIN to the output stream.
  */
 void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d282336..e99cf74 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -742,6 +742,225 @@ apply_handle_commit(StringInfo s)
 }
 
 /*
+ * Called from apply_handle_prepare to handle a PREPARE TRANSACTION.
+ */
+static void
+apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data)
+{
+	Assert(prepare_data->prepare_lsn == remote_final_lsn);
+
+	/* The synchronization worker runs in single transaction. */
+	if (IsTransactionState() && !am_tablesync_worker())
+	{
+		/* End the earlier transaction and start a new one */
+		BeginTransactionBlock();
+		CommitTransactionCommand();
+		StartTransactionCommand();
+
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = prepare_data->end_lsn;
+		replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+		PrepareTransactionBlock(prepare_data->gid);
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+
+		store_flush_position(prepare_data->end_lsn);
+	}
+	else
+	{
+		/* Process any invalidation messages that might have accumulated. */
+		AcceptInvalidationMessages();
+		maybe_reread_subscription();
+	}
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Called from apply_handle_prepare to handle a COMMIT PREPARED of a previously
+ * PREPARED transaction.
+ */
+static void
+apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data)
+{
+	/* there is no transaction when COMMIT PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data->end_lsn;
+	replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+	FinishPreparedTransaction(prepare_data->gid, true);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Called from apply_handle_prepare to handle a ROLLBACK PREPARED of a previously
+ * PREPARED TRANSACTION.
+ */
+static void
+apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data)
+{
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data->end_lsn;
+	replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+	/*
+	 * During logical decoding, on the apply side, it's possible that a
+	 * prepared transaction got aborted while decoding. In that case, we stop
+	 * the decoding and abort the transaction immediately. However the
+	 * ROLLBACK prepared processing still reaches the subscriber. In that case
+	 * it's ok to have a missing gid
+	 */
+	if (LookupGXact(prepare_data->gid))
+	{
+		/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
+		ensure_transaction();
+		FinishPreparedTransaction(prepare_data->gid, false);
+		CommitTransactionCommand();
+	}
+
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+	LogicalRepPrepareData prepare_data;
+
+	logicalrep_read_prepare(s, &prepare_data);
+
+	switch (prepare_data.prepare_type)
+	{
+		case LOGICALREP_IS_PREPARE:
+			apply_handle_prepare_txn(&prepare_data);
+			break;
+
+		case LOGICALREP_IS_COMMIT_PREPARED:
+			apply_handle_commit_prepared_txn(&prepare_data);
+			break;
+
+		case LOGICALREP_IS_ROLLBACK_PREPARED:
+			apply_handle_rollback_prepared_txn(&prepare_data);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("unexpected type of prepare message: %d",
+							prepare_data.prepare_type)));
+	}
+}
+
+/*
+ * Handle STREAM PREPARE.
+ *
+ * Logic is in two parts:
+ * 1. Replay all the spooled operations
+ * 2. Mark the transaction as prepared
+ */
+static void
+apply_handle_stream_prepare(StringInfo s)
+{
+	int nchanges = 0;
+	LogicalRepPrepareData prepare_data;
+	TransactionId xid;
+
+	Assert(!in_streamed_transaction);
+
+	xid = logicalrep_read_stream_prepare(s, &prepare_data);
+	elog(DEBUG1, "received prepare for streamed transaction %u", xid);
+
+	/*
+	 * This should be a PREPARE only. The COMMIT PREPARED and ROLLBACK PREPARED
+	 * for streaming are handled by the non-streaming APIs.
+	 */
+	Assert(prepare_data.prepare_type == LOGICALREP_IS_PREPARE);
+
+	/*
+	 * ========================================
+	 * 1. Replay all the spooled operations
+	 * - This code is same as what apply_handle_stream_commit does for NON two-phase stream commit
+	 * ========================================
+	 */
+
+	ensure_transaction();
+
+	nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn);
+
+	/*
+	 * ========================================
+	 * 2. Mark the transaction as prepared.
+	 * - This code is same as what apply_handle_prepare_txn does for two-phase prepare of the non-streamed tx
+	 * ========================================
+	 */
+	BeginTransactionBlock();
+	CommitTransactionCommand();
+	StartTransactionCommand();
+
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data.end_lsn;
+	replorigin_session_origin_timestamp = prepare_data.preparetime;
+
+	PrepareTransactionBlock(prepare_data.gid);
+	CommitTransactionCommand();
+
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data.end_lsn);
+
+	elog(DEBUG1, "apply_handle_stream_prepare_txn: replayed %d (all) changes.", nchanges);
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data.end_lsn);
+
+	/* unlink the files with serialized changes and subxact info */
+	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
  * Handle ORIGIN message.
  *
  * TODO, support tracking of multiple origins
@@ -1969,6 +2188,14 @@ apply_dispatch(StringInfo s)
 		case LOGICAL_REP_MSG_STREAM_COMMIT:
 			apply_handle_stream_commit(s);
 			return;
+
+		case LOGICAL_REP_MSG_PREPARE:
+			apply_handle_prepare(s);
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			apply_handle_stream_prepare(s);
+			return;
 	}
 
 	ereport(ERROR,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c997ae..9f27234 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+								 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
@@ -57,6 +63,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr commit_lsn);
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -143,6 +151,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->commit_cb = pgoutput_commit_txn;
+
+	cb->prepare_cb = pgoutput_prepare_txn;
+	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
@@ -153,6 +165,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_truncate_cb = pgoutput_truncate;
+	/* transaction streaming - two-phase commit */
+	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
 }
 
 static void
@@ -378,6 +392,48 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					 XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * ROLLBACK PREPARED callback
+ */
+static void
+pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
  * Write the current schema of the relation and its ancestor (if any) if not
  * done yet.
  */
@@ -857,6 +913,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 }
 
 /*
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+							ReorderBufferTXN *txn,
+							XLogRecPtr prepare_lsn)
+{
+	Assert(rbtxn_is_streamed(txn));
+
+	OutputPluginUpdateProgress(ctx);
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
  * Initialize the relation schema sync cache for a decoding session.
  *
  * The hash table is destroyed at the end of a decoding session. While
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 2ca71c3..b2628ea 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -44,6 +44,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid,
 extern void StartPrepare(GlobalTransaction gxact);
 extern void EndPrepare(GlobalTransaction gxact);
 extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
+extern bool LookupGXact(const char *gid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 												 int *nxids_p);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index cca13da..d28292d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,10 +54,12 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
-	LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+	LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
 } LogicalRepMsgType;
 
 /*
@@ -114,6 +116,7 @@ typedef struct LogicalRepBeginData
 	TransactionId xid;
 } LogicalRepBeginData;
 
+/* Commit (and abort) information */
 typedef struct LogicalRepCommitData
 {
 	XLogRecPtr	commit_lsn;
@@ -121,6 +124,28 @@ typedef struct LogicalRepCommitData
 	TimestampTz committime;
 } LogicalRepCommitData;
 
+
+/* Prepare protocol information */
+typedef struct LogicalRepPrepareData
+{
+	uint8		prepare_type;
+	XLogRecPtr	prepare_lsn;
+	XLogRecPtr	end_lsn;
+	TimestampTz preparetime;
+	char		gid[GIDSIZE];
+} LogicalRepPrepareData;
+
+/* types of the prepare protocol message */
+#define LOGICALREP_IS_PREPARE			0x01
+#define LOGICALREP_IS_COMMIT_PREPARED	0x02
+#define LOGICALREP_IS_ROLLBACK_PREPARED	0x04
+
+/* prepare can be exactly one of PREPARE, [COMMIT|ROLLBACK] PREPARED*/
+#define PrepareFlagsAreValid(flags) \
+	(((flags) == LOGICALREP_IS_PREPARE) || \
+	 ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \
+	 ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED))
+
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
 extern void logicalrep_read_begin(StringInfo in,
 								  LogicalRepBeginData *begin_data);
@@ -128,6 +153,10 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 									XLogRecPtr commit_lsn);
 extern void logicalrep_read_commit(StringInfo in,
 								   LogicalRepCommitData *commit_data);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+									  XLogRecPtr prepare_lsn);
+extern void logicalrep_read_prepare(StringInfo in,
+									LogicalRepPrepareData * prepare_data);
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
 									XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
@@ -171,4 +200,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
 										 TransactionId *subxid);
 
+extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
+											XLogRecPtr prepare_lsn);
+extern TransactionId logicalrep_read_stream_prepare(StringInfo in,
+													LogicalRepPrepareData *prepare_data);
+
+
 #endif							/* LOGICAL_PROTO_H */
-- 
1.8.3.1

