From db3f5b04d89c30d545512ddceddbca40ea0a2795 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 4 Nov 2020 17:43:04 +1100
Subject: [PATCH v16] Support 2PC txn - spoolfile.

This patch only refactors to isolate the streaming spool-file processing to a separate function.
Later, two-phase commit logic will require this common processing to be called from multiple places.
---
 src/backend/replication/logical/worker.c | 58 +++++++++++++++++++++-----------
 1 file changed, 38 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0468491..d282336 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -244,6 +244,8 @@ static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
 									   LogicalRepRelMapEntry *relmapentry,
 									   CmdType operation);
 
+static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -933,30 +935,21 @@ apply_handle_stream_abort(StringInfo s)
 }
 
 /*
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
+ * Returns how many changes were applied.
  */
-static void
-apply_handle_stream_commit(StringInfo s)
+static int
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 {
-	TransactionId xid;
 	StringInfoData s2;
 	int			nchanges;
 	char		path[MAXPGPATH];
 	char	   *buffer = NULL;
 	bool		found;
-	LogicalRepCommitData commit_data;
 	StreamXidHash *ent;
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
-	Assert(!in_streamed_transaction);
-
-	xid = logicalrep_read_stream_commit(s, &commit_data);
-
-	elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
-	ensure_transaction();
-
 	/*
 	 * Allocate file handle and memory required to process all the messages in
 	 * TopTransactionContext to avoid them getting reset after each message is
@@ -964,7 +957,7 @@ apply_handle_stream_commit(StringInfo s)
 	 */
 	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
 
-	/* open the spool file for the committed transaction */
+	/* open the spool file for the committed/prepared transaction */
 	changes_filename(path, MyLogicalRepWorker->subid, xid);
 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
 	ent = (StreamXidHash *) hash_search(xidhash,
@@ -979,7 +972,7 @@ apply_handle_stream_commit(StringInfo s)
 
 	MemoryContextSwitchTo(oldcxt);
 
-	remote_final_lsn = commit_data.commit_lsn;
+	remote_final_lsn = lsn;
 
 	/*
 	 * Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1048,6 +1041,35 @@ apply_handle_stream_commit(StringInfo s)
 
 	BufFileClose(fd);
 
+	pfree(buffer);
+	pfree(s2.data);
+
+	elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
+		 nchanges, path);
+
+	return nchanges;
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+	TransactionId xid;
+	LogicalRepCommitData commit_data;
+	int nchanges = 0;
+
+	Assert(!in_streamed_transaction);
+
+	xid = logicalrep_read_stream_commit(s, &commit_data);
+
+	elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+	ensure_transaction();
+
+	nchanges = apply_spooled_messages(xid, commit_data.commit_lsn);
+
 	/*
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
@@ -1055,16 +1077,12 @@ apply_handle_stream_commit(StringInfo s)
 	replorigin_session_origin_lsn = commit_data.end_lsn;
 	replorigin_session_origin_timestamp = commit_data.committime;
 
-	pfree(buffer);
-	pfree(s2.data);
-
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
 	store_flush_position(commit_data.end_lsn);
 
-	elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
-		 nchanges, path);
+	elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges);
 
 	in_remote_transaction = false;
 
-- 
1.8.3.1

