From 35d700dabbb583a2496b16ad7bb5a49a54d97f0d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 17 Aug 2017 14:13:29 -0700
Subject: [PATCH 4/4] Improve locking for subscriptions and subscribed relations

---
 src/backend/commands/subscriptioncmds.c     |   57 ++-----
 src/backend/replication/logical/launcher.c  |  228 +++++++++++----------------
 src/backend/replication/logical/tablesync.c |   63 +++++---
 src/include/replication/worker_internal.h   |    5 +-
 4 files changed, 145 insertions(+), 208 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 354d037..85b97c3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -633,12 +633,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
+	subid = HeapTupleGetOid(tup);
+
 	/* must be owner */
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
-	subid = HeapTupleGetOid(tup);
 	sub = GetSubscription(subid, false);
 
 	/* Lock the subscription so nobody else can do anything with it. */
@@ -811,16 +812,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
-	Datum		datum;
-	bool		isnull;
-	char	   *subname;
-	char	   *conninfo;
-	char	   *slotname;
 	List	   *subworkers;
 	ListCell   *lc;
 	char		originname[NAMEDATALEN];
 	char	   *err = NULL;
 	RepOriginId originid;
+	Subscription *sub;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 
@@ -828,7 +825,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
 	 * launcher doesn't restart new worker during dropping the subscription
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
@@ -860,31 +857,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	sub = GetSubscription(subid, false);
 
-	/* Get subname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subname, &isnull);
-	Assert(!isnull);
-	subname = pstrdup(NameStr(*DatumGetName(datum)));
-
-	/* Get conninfo */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subconninfo, &isnull);
-	Assert(!isnull);
-	conninfo = TextDatumGetCString(datum);
-
-	/* Get slotname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subslotname, &isnull);
-	if (!isnull)
-		slotname = pstrdup(NameStr(*DatumGetName(datum)));
-	else
-		slotname = NULL;
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
@@ -896,7 +872,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * of a subscription that is associated with a replication slot", but we
 	 * don't have the proper facilities for that.
 	 */
-	if (slotname)
+	if (sub->slotname)
 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
 
 
@@ -923,7 +899,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		if (slotname)
+		if (sub->slotname)
 			logicalrep_worker_stop(w->subid, w->relid);
 		else
 			logicalrep_worker_stop_at_commit(w->subid, w->relid);
@@ -946,7 +922,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!sub->slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
@@ -959,13 +935,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+					 quote_identifier(sub->slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
+						"drop the replication slot \"%s\"", sub->slotname),
 				 errdetail("The error was: %s", err),
 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
 						 "to disassociate the subscription from the slot.")));
@@ -979,12 +956,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		if (res->status != WALRCV_OK_COMMAND)
 			ereport(ERROR,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+							sub->slotname),
 					 errdetail("The error was: %s", res->err)));
 		else
 			ereport(NOTICE,
 					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+							sub->slotname)));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6c89442..e9bc9af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,6 +42,7 @@
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
@@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false;
 
 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
-
 /*
- * Load the list of subscriptions.
- *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Load the list of emabled subscription oids.
  */
 static List *
-get_subscription_list(void)
+get_subscription_oids(void)
 {
 	List	   *res = NIL;
 	Relation	rel;
 	HeapScanDesc scan;
 	HeapTuple	tup;
-	MemoryContext resultcxt;
-
-	/* This is the context that we will allocate our output data in */
-	resultcxt = CurrentMemoryContext;
 
 	/*
-	 * Start a transaction so we can access pg_database, and get a snapshot.
 	 * We don't have a use for the snapshot itself, but we're interested in
 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
 	 * for anything that reads heap pages, because HOT may decide to prune
 	 * them even if the process doesn't attempt to modify any tuples.)
 	 */
-	StartTransactionCommand();
 	(void) GetTransactionSnapshot();
 
 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -129,34 +120,17 @@ get_subscription_list(void)
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 	{
 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
-		Subscription *sub;
-		MemoryContext oldcxt;
 
-		/*
-		 * Allocate our results in the caller's context, not the
-		 * transaction's. We do this inside the loop, and restore the original
-		 * context at the end, so that leaky things like heap_getnext() are
-		 * not called in a potentially long-lived context.
-		 */
-		oldcxt = MemoryContextSwitchTo(resultcxt);
-
-		sub = (Subscription *) palloc0(sizeof(Subscription));
-		sub->oid = HeapTupleGetOid(tup);
-		sub->dbid = subform->subdbid;
-		sub->owner = subform->subowner;
-		sub->enabled = subform->subenabled;
-		sub->name = pstrdup(NameStr(subform->subname));
-		/* We don't fill fields we are not interested in. */
-
-		res = lappend(res, sub);
-		MemoryContextSwitchTo(oldcxt);
+		/* We only care about enabled subscriptions. */
+		if (!subform->subenabled)
+			continue;
+
+		res = lappend_oid(res, HeapTupleGetOid(tup));
 	}
 
 	heap_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
 	return res;
 }
 
@@ -282,23 +256,38 @@ logicalrep_workers_find(Oid subid, bool only_running)
 }
 
 /*
- * Start new apply background worker.
+ * Start new logical replication background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
 	int			i;
 	int			slot = 0;
-	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
+	List	   *subworkers;
+	ListCell   *lc;
 	TimestampTz now;
+	int			nsyncworkers = 0;
+	Subscription *sub;
+	LogicalRepWorker *worker = NULL;
 
 	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+			(errmsg("starting logical replication worker for subscription %u",
+					subid)));
+
+	/* Block any concurrent DDL on the subscription. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/* Get info about subscription. */
+	sub = GetSubscription(subid, true);
+	if (!sub)
+	{
+		ereport(DEBUG1,
+				(errmsg("subscription %u not found, not starting worker for it",
+						subid)));
+		return;
+	}
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -326,7 +315,14 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	subworkers = logicalrep_workers_find(subid, false);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (w->relid != InvalidOid)
+			nsyncworkers ++;
+	}
+	list_free(subworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -372,6 +368,7 @@ retry:
 	if (nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		return;
 	}
 
@@ -382,6 +379,7 @@ retry:
 	if (worker == NULL)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of logical replication worker slots"),
@@ -394,8 +392,8 @@ retry:
 	worker->in_use = true;
 	worker->generation++;
 	worker->proc = NULL;
-	worker->dbid = dbid;
-	worker->userid = userid;
+	worker->dbid = sub->dbid;
+	worker->userid = sub->owner;
 	worker->subid = subid;
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -406,8 +404,6 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
-	LWLockRelease(LogicalRepWorkerLock);
-
 	/* Register the new dynamic worker. */
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -426,8 +422,13 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
+	/* Try to register the worker and cleanup in case of failure. */
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of background worker slots"),
@@ -435,13 +436,24 @@ retry:
 		return;
 	}
 
+	/* Done with the worker array. */
+	LWLockRelease(LogicalRepWorkerLock);
+
 	/* Now wait until it attaches. */
 	WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+	/*
+	 * Worker either started or died, in any case we are done with the
+	 * subscription.
+	 */
+	UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 }
 
 /*
  * Stop the logical replication worker for subid/relid, if any, and wait until
  * it detaches from the slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -449,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LogicalRepWorker *worker;
 	uint16		generation;
 
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	/* Exclusive is needed for logicalrep_worker_cleanup(). */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
 	worker = logicalrep_worker_find(subid, relid, false);
 
@@ -460,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		return;
 	}
 
+	/* If there is worker but it's not running, clean it up. */
+	if (!worker->proc)
+	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		return;
+	}
+
 	/*
 	 * Remember which generation was our worker so we can check if what we see
 	 * is still the same one.
 	 */
 	generation = worker->generation;
 
-	/*
-	 * If we found a worker but it does not have proc set then it is still
-	 * starting up; wait for it to finish starting and then kill it.
-	 */
-	while (worker->in_use && !worker->proc)
-	{
-		int			rc;
-
-		LWLockRelease(LogicalRepWorkerLock);
-
-		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		/* emergency bailout if postmaster has died */
-		if (rc & WL_POSTMASTER_DEATH)
-			proc_exit(1);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Recheck worker status. */
-		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/*
-		 * Check whether the worker slot is no longer used, which would mean
-		 * that the worker has exited, or whether the worker generation is
-		 * different, meaning that a different worker has taken the slot.
-		 */
-		if (!worker->in_use || worker->generation != generation)
-		{
-			LWLockRelease(LogicalRepWorkerLock);
-			return;
-		}
-
-		/* Worker has assigned proc, so it has started. */
-		if (worker->proc)
-			break;
-	}
-
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
 
@@ -539,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/*
+		 * Shared lock is enough for the loop as we don't need to do the slot
+		 * cleanup because at this point we know that the worker has attached
+		 * to the shmem and will clean the slot on detach automatically.
+		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 	}
 
@@ -706,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
 }
 
 /*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
-	int			i;
-	int			res = 0;
-
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
-	return res;
-}
-
-/*
  * ApplyLauncherShmemSize
  *		Compute space needed for replication launcher shared memory
  */
@@ -899,8 +857,6 @@ ApplyLauncherMain(Datum main_arg)
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -912,28 +868,29 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
+			/*
+			 * Start new transaction so that we can take locks and snapshots.
+			 *
+			 * Any allocations will also be made inside the transaction memory
+			 * context.
+			 */
+			StartTransactionCommand();
+
+			/* Search for subscriptions to start. */
+			sublist = get_subscription_oids();
+
+			/* Start the missing workers. */
 			foreach(lc, sublist)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
+				Oid	subid = lfirst_oid(lc);
+				Subscription *sub = GetSubscription(subid, false);
 				LogicalRepWorker *w;
 
 				if (!sub->enabled)
 					continue;
 
 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+				w = logicalrep_worker_find(subid, InvalidOid, false);
 				LWLockRelease(LogicalRepWorkerLock);
 
 				if (w == NULL)
@@ -941,15 +898,12 @@ ApplyLauncherMain(Datum main_arg)
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+					/* Start the worker. */
+					logicalrep_worker_launch(subid, InvalidOid);
 				}
 			}
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			CommitTransactionCommand();
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 42460b3..fdc3515 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,12 +102,13 @@
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
-#include "utils/snapmgr.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 
 static bool table_states_valid = false;
 
@@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
  * worker to the expected one.
  *
  * Used when transitioning from SYNCWAIT state to CATCHUP.
- *
- * Returns false if the apply worker has disappeared.
  */
-static bool
+static void
 wait_for_worker_state_change(char expected_state)
 {
 	int			rc;
@@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state)
 		 * enough to not give a misleading answer if we do it with no lock.)
 		 */
 		if (MyLogicalRepWorker->relstate == expected_state)
-			return true;
+			return;
 
 		/*
 		 * Bail out if the apply worker has died, else signal it we're
@@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state)
 			logicalrep_worker_wakeup_ptr(worker);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			break;
+			ereport(FATAL,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("terminating logical replication synchronization "
+							"worker due to subscription apply worker exit")));
 
 		/*
 		 * Wait.  We expect to get a latch signal back from the apply worker,
@@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state)
 		if (rc & WL_LATCH_SET)
 			ResetLatch(MyLatch);
 	}
-
-	return false;
 }
 
 /*
@@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+#define ensure_transaction_and_lock() \
+	if (!started_tx) \
+	{\
+		StartTransactionCommand(); \
+		LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \
+						 AccessShareLock); \
+		started_tx = true; \
+	}
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
@@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		list_free_deep(table_states);
 		table_states = NIL;
 
-		StartTransactionCommand();
-		started_tx = true;
+		ensure_transaction_and_lock();
 
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
+
+				ensure_transaction_and_lock();
 
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
@@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					 * Enter busy loop and wait for synchronization worker to
 					 * reach expected state (or die trying).
 					 */
-					if (!started_tx)
-					{
-						StartTransactionCommand();
-						started_tx = true;
-					}
-
+					ensure_transaction_and_lock();
 					wait_for_relation_state_change(rstate->relid,
 												   SUBREL_STATE_SYNCDONE);
 				}
@@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * running sync workers for this subscription, while we have
 				 * the lock.
 				 */
-				int			nsyncworkers =
-				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				List	   *subworkers;
+				ListCell   *lc;
+				int			nsyncworkers = 0;
+
+				subworkers = logicalrep_workers_find(MySubscription->oid,
+														 false);
+				foreach (lc, subworkers)
+				{
+					LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+					if (w->relid != InvalidOid)
+						nsyncworkers ++;
+				}
+				list_free(subworkers);
 
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
@@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-												 MySubscription->oid,
-												 MySubscription->name,
-												 MyLogicalRepWorker->userid,
+						ensure_transaction_and_lock();
+						logicalrep_worker_launch(MySubscription->oid,
 												 rstate->relid);
 						hentry->last_start_time = now;
 					}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7b8728c..c82396d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -72,15 +72,12 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 					   bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-						 Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-extern int	logicalrep_sync_worker_count(Oid subid);
-
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
 void invalidate_syncing_table_states(Datum arg, int cacheid,
-- 
1.7.1

