From b93e330a1ffe5c43d09f72e38726363bcb49d890 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 30 Dec 2020 15:02:21 +1100
Subject: [PATCH v9] WIP patch for the Solution1.

====

Features:

* tablesync slot is now permanent instead of temporary. The tablesync slot name is no longer tied to the Subscription slot name.

* the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions

* tablesync worked now allowing multiple tx instead of single tx

* a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart.

* if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase.

* tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created.

* tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply.

* The DropSubscription cleanup code was enhanced in v7 to take care of crashed sync workers.

* Minor updates to PG docs

TODO / Known Issues:

* Source includes temporary "!!>>" excessive logging which I added to help testing during development

* Address review comments
---
 doc/src/sgml/catalogs.sgml                  |   1 +
 src/backend/commands/subscriptioncmds.c     | 222 +++++++++++++++++++-------
 src/backend/replication/logical/origin.c    |   4 +-
 src/backend/replication/logical/tablesync.c | 232 ++++++++++++++++++++++++----
 src/backend/replication/logical/worker.c    |  18 +--
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/replication/slot.h              |   3 +
 7 files changed, 376 insertions(+), 105 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d988636..266615c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7651,6 +7651,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        State code:
        <literal>i</literal> = initialize,
        <literal>d</literal> = data is being copied,
+       <literal>C</literal> = table data has been copied,
        <literal>s</literal> = synchronized,
        <literal>r</literal> = ready (normal replication)
       </para></entry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 1696454..c366614 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -928,7 +929,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *err = NULL;
 	RepOriginId originid;
 	WalReceiverConn *wrconn = NULL;
-	StringInfoData cmd;
 	Form_pg_subscription form;
 
 	/*
@@ -1016,76 +1016,188 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ReleaseSysCache(tup);
 
 	/*
-	 * Stop all the subscription workers immediately.
-	 *
-	 * This is necessary if we are dropping the replication slot, so that the
-	 * slot becomes accessible.
+	 * Try to acquire the connection necessary for dropping slots.
+	 * We do this here so that the same connection may be shared
+	 * for dropping the Subscription slot, as well as dropping any
+	 * tablesync slots.
 	 *
-	 * It is also necessary if the subscription is disabled and was disabled
-	 * in the same transaction.  Then the workers haven't seen the disabling
-	 * yet and will still be running, leading to hangs later when we want to
-	 * drop the replication origin.  If the subscription was disabled before
-	 * this transaction, then there shouldn't be any workers left, so this
-	 * won't make a difference.
-	 *
-	 * New workers won't be started because we hold an exclusive lock on the
-	 * subscription till the end of the transaction.
+	 * Note: If the slotname is NONE/NULL then connection errors are
+	 * suppressed. This is necessary so that the DROP SUBSCRIPTION
+	 * can still complete even when the connection to publisher is
+	 * broken.
 	 */
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-	subworkers = logicalrep_workers_find(subid, false);
-	LWLockRelease(LogicalRepWorkerLock);
-	foreach(lc, subworkers)
+	load_file("libpqwalreceiver", false);
+
+	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	if (wrconn == NULL && slotname != NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to publisher when attempting to "
+						"drop the replication slot \"%s\"", slotname),
+				 errdetail("The error was: %s", err),
+		/* translator: %s is an SQL ALTER command */
+				 errhint("Use %s to disassociate the subscription from the slot.",
+						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+
+	PG_TRY();
 	{
-		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		/*
+		 * Stop all the subscription workers immediately.
+		 *
+		 * This is necessary if we are dropping the replication slot, so that the
+		 * slot becomes accessible.
+		 *
+		 * It is also necessary if the subscription is disabled and was disabled
+		 * in the same transaction.  Then the workers haven't seen the disabling
+		 * yet and will still be running, leading to hangs later when we want to
+		 * drop the replication origin.  If the subscription was disabled before
+		 * this transaction, then there shouldn't be any workers left, so this
+		 * won't make a difference.
+		 *
+		 * New workers won't be started because we hold an exclusive lock on the
+		 * subscription till the end of the transaction.
+		 */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		subworkers = logicalrep_workers_find(subid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		foreach(lc, subworkers)
+		{
+			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
-	}
-	list_free(subworkers);
+			logicalrep_worker_stop(w->subid, w->relid);
+		}
+		list_free(subworkers);
 
-	/* Clean up dependencies */
-	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+		/*
+		 * Tablesync resource cleanup (slots and origins).
+		 *
+		 * Any READY-state relations would already have dealt with clean-ups.
+		 */
+		{
+			List	   *rstates;
+			ListCell   *lc;
 
-	/* Remove any associated relation synchronization states. */
-	RemoveSubscriptionRel(subid, InvalidOid);
+			rstates = GetSubscriptionNotReadyRelations(subid);
+			foreach(lc, rstates)
+			{
+				SubscriptionRelState   *rstate = (SubscriptionRelState *) lfirst(lc);
+				Oid						relid = rstate->relid;
 
-	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
-	originid = replorigin_by_name(originname, true);
-	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid, false);
+				/* Only cleanup the tablesync worker resources */
+				if (!OidIsValid(relid))
+					continue;
 
-	/*
-	 * If there is no slot associated with the subscription, we can finish
-	 * here.
-	 */
-	if (!slotname)
+				/* Drop the tablesync slot. */
+				{
+					char *syncslotname = ReplicationSlotNameForTablesync(subid, relid);
+
+					/*
+					 * If the subscription slotname is NONE/NULL and the connection to publisher is
+					 * broken, but the DropSubscription should still be allowed to complete.
+					 * But without a connection it is not possible to drop any tablesync slots.
+					 */
+					if (!wrconn)
+					{
+						/* FIXME - OK to just log a warning? */
+						elog(WARNING, "!!>> DropSubscription: no connection. Cannot drop tablesync slot \"%s\".",
+									  syncslotname);
+					}
+					else
+					{
+						PG_TRY();
+						{
+							elog(LOG, "!!>> DropSubscription: dropping the tablesync slot \"%s\".", syncslotname);
+							ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+							elog(LOG, "!!>> DropSubscription: dropped the tablesync slot \"%s\".", syncslotname);
+						}
+						PG_CATCH();
+						{
+							/*
+							 * Typically tablesync will delete its own slot after it reaches
+							 * SYNCDONE state. Then the apply worker moves the tablesync from
+							 * SYNCDONE to READY state.
+							 *
+							 * Rarely, the DropSubscription may be issued in between when a
+							 * tablesync still is in SYNCDONE, but not yet reached READY state.
+							 * If this happens then the drop slot could fail since it was
+							 * already dropped, so suppress the error.
+							 */
+							if (rstate->state != SUBREL_STATE_SYNCDONE)
+							{
+								pfree(syncslotname);
+								PG_RE_THROW();
+							}
+						}
+						PG_END_TRY();
+					}
+					pfree(syncslotname);
+				}
+
+				/* Remove the tablesync's origin tracking if exists. */
+				{
+					snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+					originid = replorigin_by_name(originname, true);
+					if (originid != InvalidRepOriginId)
+					{
+						elog(LOG, "!!>> DropSubscription: dropping origin tracking for \"%s\"", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname);
+					}
+				}
+
+			}
+			list_free(rstates);
+		}
+
+		/* Clean up dependencies */
+		deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+
+		/* Remove any associated relation synchronization states. */
+		RemoveSubscriptionRel(subid, InvalidOid);
+
+		/* Remove the origin tracking if exists. */
+		snprintf(originname, sizeof(originname), "pg_%u", subid);
+		originid = replorigin_by_name(originname, true);
+		if (originid != InvalidRepOriginId)
+			replorigin_drop(originid, false);
+
+		/*
+		 * If there is a slot associated with the subscription, then drop the
+		 * replication slot at the publisher node using the replication
+		 * connection.
+		 */
+		if (slotname)
+			ReplicationSlotDropAtPubNode(wrconn, slotname);
+	}
+	PG_FINALLY();
 	{
+		if (wrconn)
+			walrcv_disconnect(wrconn);
+
 		table_close(rel, NoLock);
-		return;
 	}
+	PG_END_TRY();
+}
+
+
+/*
+ * Drop the replication slot at the publisher node
+ * using the replication connection.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname)
+{
+	StringInfoData	cmd;
+
+	Assert(wrconn);
 
-	/*
-	 * Otherwise drop the replication slot at the publisher node using the
-	 * replication connection.
-	 */
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
-	if (wrconn == NULL)
-		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err),
-		/* translator: %s is an SQL ALTER command */
-				 errhint("Use %s to disassociate the subscription from the slot.",
-						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
-
 	PG_TRY();
 	{
-		WalRcvExecResult *res;
+		WalRcvExecResult   *res;
 
 		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 
@@ -1103,13 +1215,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	PG_FINALLY();
 	{
-		walrcv_disconnect(wrconn);
+		pfree(cmd.data);
 	}
 	PG_END_TRY();
-
-	pfree(cmd.data);
-
-	table_close(rel, NoLock);
 }
 
 /*
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 15ab8e7..6b79dc6 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -843,7 +843,7 @@ replorigin_redo(XLogReaderState *record)
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
  * setting up replorigin_session_origin_lsn and replorigin_session_origin
- * that ensures we won't loose knowledge about that after a crash if the
+ * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
  * local_commit needs to be a local LSN of the commit so that we can make sure
@@ -905,7 +905,7 @@ replorigin_advance(RepOriginId node,
 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
 
 		/* Make sure it's not used by somebody else */
-		if (replication_state->acquired_by != 0)
+		if (replication_state->acquired_by != 0 && replication_state->acquired_by != MyProcPid)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6259606..8180f49 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -43,13 +43,17 @@
  *		 state to SYNCDONE.  There might be zero changes applied between
  *		 CATCHUP and SYNCDONE, because the sync worker might be ahead of the
  *		 apply worker.
+ *	   - The sync worker has a intermediary state COPYDONE which comes after
+ *		CATCHUP and before SYNCDONE. This state indicates that the initial
+ *		table copy phase has completed, so if the worker crashes before
+ *		reaching SYNCDONE the copy will not be re-attempted. 
  *	   - Once the state is set to SYNCDONE, the apply will continue tracking
  *		 the table until it reaches the SYNCDONE stream position, at which
  *		 point it sets state to READY and stops tracking.  Again, there might
  *		 be zero changes in between.
  *
  *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- *	  CATCHUP -> SYNCDONE -> READY.
+ *	  CATCHUP -> (sync worker COPYDONE) -> SYNCDONE -> READY.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state.  Some transient state during data
@@ -64,6 +68,7 @@
  *			-> set in memory CATCHUP
  *			-> enter wait-loop
  *		  sync:10
+ *			-> set in catalog COPYDONE
  *			-> set in catalog SYNCDONE
  *			-> exit
  *		  apply:10
@@ -79,6 +84,7 @@
  *			-> set in memory CATCHUP
  *			-> continue per-table filtering
  *		  sync:10
+ *			-> set in catalog COPYDONE
  *			-> set in catalog SYNCDONE
  *			-> exit
  *		  apply:10
@@ -102,6 +108,8 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -139,6 +147,33 @@ finish_sync_worker(void)
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/*
+	 * Cleanup the tablesync slot.
+	 */
+	{
+		/* Calculate the name of the tablesync slot */
+		char *syncslotname = ReplicationSlotNameForTablesync(
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
+
+		PG_TRY();
+		{
+			elog(LOG, "!!>> finish_sync_worker: dropping the tablesync slot \"%s\".", syncslotname);
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+			elog(LOG, "!!>> finish_sync_worker: dropped the tablesync slot \"%s\".", syncslotname);
+		}
+		PG_CATCH();
+		{
+			/*
+			 * NOP. Suppress any drop slot error because otherwise
+			 * it would cause the tablesync to fail and re-launch.
+			 */
+		}
+		PG_END_TRY();
+
+		pfree(syncslotname);
+	}
+
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
@@ -270,8 +305,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
-
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -284,6 +317,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+		 * UpdateSubscriptionRelState must be called within a transaction.
+		 * That transaction will be ended within the finish_sync_worker().
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -406,12 +448,41 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
+
 				if (!started_tx)
 				{
 					StartTransactionCommand();
 					started_tx = true;
 				}
 
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The cleanup is done here instead of in the finish_sync_worker function because
+				 * if the tablesync worker process attempted to call replorigin_drop then that will
+				 * hang because the replorigin_drop considers the owning tablesync PID as "busy".
+				 *
+				 * Do this before updating the state, so that DropSubscription can know that all
+				 * READY workers have already had their origin tracking removed.
+				 */
+				{
+					char        originname[NAMEDATALEN];
+					RepOriginId originid;
+
+					snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+					originid = replorigin_by_name(originname, true);
+					elog(LOG, "!!>> apply worker: find tablesync origin tracking for \"%s\".", originname);
+					if (OidIsValid(originid))
+					{
+						elog(LOG, "!!>> apply worker: dropping tablesync origin tracking for \"%s\".", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> apply worker: dropped tablesync origin tracking for \"%s\".", originname);
+					}
+				}
+
+				/*
+				 * Update the state only after the origin cleanup.
+				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
@@ -807,6 +878,32 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+/*
+ * Determine the tablesync slot name.
+ *
+ * The returned slot name is palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid)
+{
+	char *syncslotname;
+
+	/*
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.
+	 *
+	 * The name is calculated as pg_%u_sync_%u (3 + 10 + 6 + 10 + '\0').
+	 * (It's actually the NAMEDATALEN on the remote that matters, but this
+	 * scheme will also work reasonably if that is different.)
+	 */
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
+
+	syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -849,17 +946,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
 						MySubscription->oid,
 						MyLogicalRepWorker->relid);
 
@@ -874,7 +962,19 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed/etc
+		 * before it was able to finish normally.
+		 */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_COPYDONE.");
+		StartTransactionCommand();
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -890,9 +990,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -918,29 +1015,98 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot.  This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
+	elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname);
+	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
+	}
+	PG_CATCH();
+	{
+		/* If something failed during copy table then cleanup the created slot. */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname);
+		ReplicationSlotDropAtPubNode(wrconn, slotname);
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", slotname);
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		pfree(slotname);
+		slotname = NULL;
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	/* Update the persisted state to indicate the COPY phase is done; make it visible to others. */
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_COPYDONE,
+							   MyLogicalRepWorker->relstate_lsn);
+
+copy_table_done:
+
+	/* Setup replication origin tracking. */
+	{
+		char		originname[NAMEDATALEN];
+		RepOriginId originid;
+
+		snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+		originid = replorigin_by_name(originname, true);
+		if (!OidIsValid(originid))
+		{
+			/*
+			 * Origin tracking does not exist. Create it now, and advance to LSN got from walrcv_create_slot.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname);
+			originid = replorigin_create(originname);
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname);
+			replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+							   true /* go backward */ , true /* WAL log */ );
+		}
+		else
+		{
+			/*
+			 * Origin tracking already exists.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname);
+			*origin_startpos = replorigin_session_get_progress(false);
+		}
 
-	table_close(rel, NoLock);
+		elog(LOG, "!!>> LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+				   originname,
+				   (uint32) (*origin_startpos >> 32),
+				   (uint32) *origin_startpos);
+	}
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+	CommitTransactionCommand();
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3874939..d28cfb8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index acc2926..e9f2b3f 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_COPYDONE	'C' /* tablesync copy phase is completed */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab69..5f19089 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

