From 9adb04bdb827f44a91e45d53b1fad5a02213777c Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 15 Dec 2020 20:46:27 +1100
Subject: [PATCH v3] 2PC-Solution1-WIP-20201215.

This patch applies onto the v30 patch set [1] from other 2PC thread:
[1] https://www.postgresql.org/message-id/CAFPTHDYA8yE6tEmQ2USYS68kNt%2BkM%3DSwKgj%3Djy4AvFD5e9-UTQ%40mail.gmail.com

====

Coded / WIP:

* tablesync slot is now permanent instead of temporary

* the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions

* tablesync worked now allowing multiple tx instead of single tx

* a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart.

* if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase.

TODO / Known Issues:

* The tablesync replication origin/lsn logic all needs to be updated so that tablesync knows where to restart based on information held by the now permanent slot.

* the current implementation of tablesync drop slot (e.g. from DROP SUBSCRIPTION) or finish_sync_worker regenerates the tablesync slot name so it knows what slot to drop. The current code may be ok for normal use cases, but if there is a ALTER SUBSCRIPTION ... SET (slot_name = newname) it would fail to be able to find the tablesync slot.

* help / comments / cleanup

* There is temporary "!!>>" excessive logging of mine scattered around which I added to help my testing during development
---
 src/backend/commands/subscriptioncmds.c     | 108 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c | 163 ++++++++++++++++++++++------
 src/backend/replication/logical/worker.c    |  21 +---
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/commands/subscriptioncmds.h     |   1 +
 src/include/replication/slot.h              |   1 +
 6 files changed, 249 insertions(+), 46 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b0745d5..e2b9618 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -1070,6 +1071,41 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
+		/* Is this a tablesync worker? If yes, drop the tablesync's slot. */
+		if (OidIsValid(w->relid))
+		{
+			/* FIXME 1 - This slotname check below is workaround needed because the tablesync slot name
+ 			 * is derived from the subscription slot name, so if that was set slot_name = NONE then we cannot do 
+ 			 * that calculation anymore to get the tablesyn slot name.
+ 			 *
+ 			 * FIXME 2 - If subscription slot name changes from 'aaa' to 'bbb' then it will be not be possible
+ 			 * to get back to those tablesyn slots. Some resigned needed (eg store the tablesync slotname somewhere)
+ 			 * to avoid this trouble...
+ 			 */
+			if (slotname)
+			{
+				extern void ReplicationSlotDropAtPubNode(
+						WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+				/* Calculate the name of the tablesync slot */
+				char *syncslotname = ReplicationSlotNameForTablesync(slotname, w->subid, w->relid);
+
+				elog(LOG, "!!>> DROP SUBSCRIPTION - now dropping the tablesync slot \"%s\".", syncslotname);
+				ReplicationSlotDropAtPubNode(
+								NULL,
+								conninfo, /* use conninfo to make a new connection. */
+								subname,
+								syncslotname);
+
+				pfree(syncslotname);
+			}
+			else
+			{
+				elog(LOG, "!!>> DROP SUBSCRIPTION - no slotname for relid %u.", w->relid);
+			}
+		}
+
+		/* Stop the worker. */
 		logicalrep_worker_stop(w->subid, w->relid);
 	}
 	list_free(subworkers);
@@ -1144,6 +1180,78 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+
+/*
+ * Drop the replication slot at the publisher node
+ * using the replication connection.
+ *
+ * If the connection is passed then just use that,
+ * otherwise connect/disconnect within this function.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname)
+{
+	StringInfoData cmd;
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
+
+	/*
+ 	 * If the connection was passed then use it.
+ 	 * If the connection was not passed then make a new connection using the passed conninfo.
+ 	 */
+	if (wrconn_given != NULL)
+	{
+		Assert (conninfo == NULL);
+		wrconn = wrconn_given;
+	}
+	else
+	{
+		char	   *err = NULL;
+
+		Assert(conninfo != NULL);
+		wrconn = walrcv_connect(conninfo, true, subname, &err);
+
+		if (wrconn == NULL)
+			ereport(ERROR,
+					(errmsg("could not connect to publisher when attempting to "
+							"drop the replication slot \"%s\"", slotname),
+					 errdetail("The error was: %s", err)));
+	}
+
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							slotname),
+					 errdetail("The error was: %s", res->err)));
+		else
+			ereport(LOG,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							slotname)));
+
+		walrcv_clear_result(res);
+	}
+	PG_CATCH();
+	{
+		/* NOP. Just gobble any ERROR. */
+	}
+	PG_END_TRY();
+
+	/* Disconnect the connection (unless using one passed) */
+	if (wrconn_given == NULL)
+		walrcv_disconnect(wrconn);
+
+	pfree(cmd.data);
+}
+
 /*
  * Internal workhorse for changing a subscription owner
  */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1904f34..7378cb6 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,6 +102,7 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -139,6 +140,28 @@ finish_sync_worker(void)
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/*
+ 	 * Cleanup the tablesync slot.
+	 */
+	{
+		extern void ReplicationSlotDropAtPubNode(
+			WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+		/* Calculate the name of the tablesync slot */
+		char *syncslotname = ReplicationSlotNameForTablesync(
+						MySubscription->slotname,
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
+
+		elog(LOG, "!!>> Dropping the tablesync slot \"%s\".", syncslotname);
+		ReplicationSlotDropAtPubNode(
+						wrconn,
+						NULL, /* use the current connection. */
+						MySubscription->name, syncslotname);
+
+		pfree(syncslotname);
+	}
+
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
@@ -270,8 +293,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
-
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -284,6 +305,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+ 		 * UpdateSubscriptionRelState must be called within a transaction.
+ 		 * That transaction will be ended within the finish_sync_worker().
+ 		 */
+ 		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -808,6 +838,35 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+/*
+ * Determine the tablesync slot name.
+ *
+ * The returned slot name is palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid)
+{
+	char *syncslotname;
+
+	/*
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
+	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
+	 * NAMEDATALEN on the remote that matters, but this scheme will also work
+	 * reasonably if that is different.)
+	 */
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
+
+	syncslotname = psprintf("%.*s_%u_sync_%u",
+						NAMEDATALEN - 28,
+						subslotname,
+						suboid,
+						relid);
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -825,6 +884,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	bool		copied_ok;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -850,16 +910,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
+	/* Calculate the name of the tablesync slot */
+	slotname = ReplicationSlotNameForTablesync(
 						MySubscription->slotname,
 						MySubscription->oid,
 						MyLogicalRepWorker->relid);
@@ -875,7 +927,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE);
+
+	/*
+	* The COPY phase was previously done, but tablesync then crashed/etc
+	* before it was able to finish normally.
+	*/
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE)
+	{
+		elog(LOG, "!!>> tablesync relstate was SUBREL_STATE_COPYDONE.");
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -891,9 +954,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -919,29 +979,70 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot.  This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
+	elog(LOG, "!!>> LogicalRepSyncTableStart calls walrcv_create_slot for \"%s\".", slotname);
+	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	copied_ok = false;
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
+
+		copied_ok = true;
+	}
+	PG_FINALLY();
+	{
+		/* If something failed during copy table then cleanup the created slot. */
+		if (!copied_ok)
+		{
+			extern void ReplicationSlotDropAtPubNode(
+				WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+			elog(LOG, "!!>> The tablesync copy failed. Drop the tablesync slot \"%s\".", slotname);
+			ReplicationSlotDropAtPubNode(
+							wrconn,
+							NULL, /* use the current connection. */
+							MySubscription->name,
+							slotname);
+
+			pfree(slotname);
+		}
+	}
+	PG_END_TRY();
 
-	table_close(rel, NoLock);
+	CommitTransactionCommand();
+
+	/* Update the persisted state to flag COPY phase is done; make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_COPYDONE,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+copy_table_done:
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9271f87..a60e9fd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -771,8 +771,7 @@ apply_handle_prepare_txn(LogicalRepPrepareData *prepare_data)
 
 	Assert(prepare_data->prepare_lsn == remote_final_lsn);
 
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * BeginTransactionBlock is necessary to balance the
@@ -1079,12 +1078,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -1161,9 +1156,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -1190,8 +1183,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1350,8 +1342,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index acc2926..e9f2b3f 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_COPYDONE	'C' /* tablesync copy phase is completed */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 804e47b..82c09d1 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -27,3 +27,4 @@ extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
+
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab69..366a737 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -211,6 +211,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

