On 27/06/17 10:51, Masahiko Sawada wrote:
> On Mon, Jun 26, 2017 at 12:12 PM, Masahiko Sawada <sawada.m...@gmail.com> 
> wrote:
> 
> I've reviewed this patch briefly.

Thanks!

> 
> @@ -515,6 +533,31 @@ logicalrep_worker_stop(Oid subid, Oid relid)
>  }
> 
>  /*
> + * Request worker to be stopped on commit.
> + */
> +void
> +logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
> +{
> +       LogicalRepWorkerId *wid;
> +       MemoryContext old;
> +
> +       old = MemoryContextSwitchTo(TopTransactionContext);
> +
> +       wid = (LogicalRepWorkerId *) palloc(sizeof(LogicalRepWorkerId));
> +
> +       /*
> +       wid = MemoryContextAlloc(TopTransactionContext,
> +
> sizeof(LogicalRepWorkerId));
> +       */
> +       wid->subid = subid;
> +       wid->relid = relid;
> +
> +       on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
> +
> +       MemoryContextSwitchTo(old);
> +}
> 
> logicalrep_worker_stop_at_commit() has a problem that new_list()
> called by lappend() allocates the memory from current memory context.
> It should switch the memory context and then allocate the memory for
> wid and append it to the list.
> 

Right, fixed (I see you did that locally as well based on the above
excerpt ;) ).

> --------
> @@ -754,9 +773,25 @@ ApplyLauncherShmemInit(void)
>  void
>  AtEOXact_ApplyLauncher(bool isCommit)
>  {
> -       if (isCommit && on_commit_launcher_wakeup)
> -               ApplyLauncherWakeup();
> +       ListCell *lc;
> +
> +       if (isCommit)
> +       {
> +               foreach (lc, on_commit_stop_workers)
> +               {
> +                       LogicalRepWorkerId *wid = lfirst(lc);
> +                       logicalrep_worker_stop(wid->subid, wid->relid);
> +               }
> +
> +               if (on_commit_launcher_wakeup)
> +                       ApplyLauncherWakeup();
> 
> Stopping the logical rep worker in AtEOXact_ApplyLauncher doesn't
> support the prepared transaction. Since we allocate the list
> on_commit_stop_workers in TopTransactionContext the postgres crashes
> if we execute any query after prepared transaction that removes
> subscription relation state. Also after fixed this issue, we still
> need to something: the list of on_commit_stop_workers is not
> associated the prepared transaction.  A query next to "preapre
> transaction" tries to stop workers at the commit. There was similar
> discussion before.
> 

Hmm, good point. I think for now it makes sense to simply don't allow
PREPARE for transactions that manipulate workers similar to what we do
when there are exported snapshots. Done it that way in attached.

> --------
> +
> +                               ensure_transaction();
> +
> UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
> +
>             rstate->relid, rstate->state,
> +
>             rstate->lsn);
> 
> 
> Should we commit the transaction if we started a new transaction
> before update the subscription relation state, or it could be deadlock
> risk.

We only lock the whole subscription (and only conflicting things are
DROP and ALTER SUBSCRIPTION), not individual subscription-relation
mapping so it doesn't seem to me like there is any higher risk of
deadlocks than anything else which works with multiple tables (or
compared to previous behavior).

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 88da433110aa3bde3dcce33ebf62d41a08c191b9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sat, 24 Jun 2017 19:38:21 +0200
Subject: [PATCH] Rework subscription worker and relation status handling

---
 src/backend/access/transam/xact.c           |   9 +
 src/backend/catalog/pg_subscription.c       | 137 ++++++------
 src/backend/commands/subscriptioncmds.c     |  98 ++++-----
 src/backend/replication/logical/launcher.c  | 309 ++++++++++++++++------------
 src/backend/replication/logical/tablesync.c |  97 +++++----
 src/backend/replication/logical/worker.c    |  23 ++-
 src/backend/utils/cache/catcache.c          |   6 +-
 src/include/catalog/pg_subscription_rel.h   |   6 +-
 src/include/replication/logicallauncher.h   |   1 +
 src/include/replication/worker_internal.h   |   6 +-
 10 files changed, 393 insertions(+), 299 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69f..322502d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+	/*
+	 * Similar to above, don't allow PREPARE but for transaction that kill
+	 * logical replication, workers.
+	 */
+	if (XactManipulatesLogicalReplicationWorkers())
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -28,6 +28,8 @@
 
 #include "nodes/makefuncs.h"
 
+#include "storage/lmgr.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -225,84 +227,101 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn)
 {
 	Relation	rel;
 	HeapTuple	tup;
-	Oid			subrelid = InvalidOid;
+	Oid			subrelid;
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
+	if (HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u already exists",
+			 relid, subid);
 
-	/*
-	 * If the record for given table does not exist yet create new record,
-	 * otherwise update the existing one.
-	 */
-	if (!HeapTupleIsValid(tup) && !update_only)
-	{
-		/* Form the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-		/* Insert tuple into catalog. */
-		subrelid = CatalogTupleInsert(rel, tup);
-
-		heap_freetuple(tup);
-	}
-	else if (HeapTupleIsValid(tup))
-	{
-		bool		replaces[Natts_pg_subscription_rel];
+	/* Form the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-		/* Update the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		memset(replaces, false, sizeof(replaces));
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	/* Insert tuple into catalog. */
+	subrelid = CatalogTupleInsert(rel, tup);
 
-		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	heap_freetuple(tup);
 
-		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-								replaces);
+	/* Cleanup. */
+	heap_close(rel, NoLock);
 
-		/* Update the catalog. */
-		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	return subrelid;
+}
 
-		subrelid = HeapTupleGetOid(tup);
-	}
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subrelid;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	subrelid = HeapTupleGetOid(tup);
 
 	/* Cleanup. */
 	heap_close(rel, NoLock);
@@ -377,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	HeapTuple	tup;
 	int			nkeys = 0;
 
+	Assert(OidIsValid(subid) || OidIsValid(relid));
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	if (OidIsValid(subid))
@@ -400,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	/* Do the search and delete what we found. */
 	scan = heap_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
-	{
 		CatalogTupleDelete(rel, &tup->t_self);
-	}
 	heap_endscan(scan);
 
 	heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9cbd36f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 				CheckSubscriptionRelkind(get_rel_relkind(relid),
 										 rv->schemaname, rv->relname);
 
-				SetSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr, false);
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
 			}
 
 			/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, subrel_local_oids,
 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
 		{
-			SetSubscriptionRelState(sub->oid, relid,
-									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr, false);
+			AddSubscriptionRelState(sub->oid, relid,
+						  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+									InvalidXLogRecPtr);
 			ereport(NOTICE,
 					(errmsg("added subscription for table %s.%s",
 							quote_identifier(rv->schemaname),
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop(sub->oid, relid);
+			logicalrep_worker_stop_at_commit(sub->oid, relid);
 
 			namespace = get_namespace_name(get_rel_namespace(relid));
 			ereport(NOTICE,
@@ -636,14 +636,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
+	subid = HeapTupleGetOid(tup);
+
 	/* must be owner */
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
-	subid = HeapTupleGetOid(tup);
 	sub = GetSubscription(subid, false);
 
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
 	/* Form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -811,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
-	Datum		datum;
-	bool		isnull;
-	char	   *subname;
-	char	   *conninfo;
-	char	   *slotname;
+	List	   *subworkers;
+	ListCell   *lc;
 	char		originname[NAMEDATALEN];
-	char	   *err = NULL;
 	RepOriginId originid;
+	char	   *err = NULL;
+	Subscription *sub;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 
@@ -826,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
 	 * launcher doesn't restart new worker during dropping the subscription
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
@@ -858,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	sub = GetSubscription(subid, false);
 
-	/* Get subname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subname, &isnull);
-	Assert(!isnull);
-	subname = pstrdup(NameStr(*DatumGetName(datum)));
-
-	/* Get conninfo */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subconninfo, &isnull);
-	Assert(!isnull);
-	conninfo = TextDatumGetCString(datum);
-
-	/* Get slotname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subslotname, &isnull);
-	if (!isnull)
-		slotname = pstrdup(NameStr(*DatumGetName(datum)));
-	else
-		slotname = NULL;
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
@@ -894,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * of a subscription that is associated with a replication slot", but we
 	 * don't have the proper facilities for that.
 	 */
-	if (slotname)
+	if (sub->slotname)
 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
 
 
@@ -906,15 +887,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	ReleaseSysCache(tup);
 
+	/*
+	 * If we are dropping slot, stop all the subscription workers immediately
+	 * so that the slot is accessible, otherwise just shedule the stop at the
+	 * end of the transaction.
+	 *
+	 * New workers won't be started because we hold exclusive lock on the
+	 * subscription till the end of transaction.
+	 */
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	LWLockRelease(LogicalRepWorkerLock);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (sub->slotname)
+			logicalrep_worker_stop(w->subid, w->relid);
+		else
+			logicalrep_worker_stop_at_commit(w->subid, w->relid);
+	}
+	list_free(subworkers);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
@@ -925,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!sub->slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
@@ -938,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+					 quote_identifier(sub->slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
+						"drop the replication slot \"%s\"", sub->slotname),
 				 errdetail("The error was: %s", err),
 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
 						 "to disassociate the subscription from the slot.")));
@@ -958,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		if (res->status != WALRCV_OK_COMMAND)
 			ereport(ERROR,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+							sub->slotname),
 					 errdetail("The error was: %s", res->err)));
 		else
 			ereport(NOTICE,
 					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+							sub->slotname)));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 86a2b14..01ef614 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,12 +42,14 @@
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
 
 #include "tcop/tcopprot.h"
 
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
@@ -73,6 +75,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+	Oid	subid;
+	Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -86,12 +96,11 @@ static bool on_commit_launcher_wakeup = false;
 
 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
-
 /*
  * Load the list of subscriptions.
  *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Only the fields interesting for worker start are filled for each
+ * subscription.
  */
 static List *
 get_subscription_list(void)
@@ -100,19 +109,13 @@ get_subscription_list(void)
 	Relation	rel;
 	HeapScanDesc scan;
 	HeapTuple	tup;
-	MemoryContext resultcxt;
-
-	/* This is the context that we will allocate our output data in */
-	resultcxt = CurrentMemoryContext;
 
 	/*
-	 * Start a transaction so we can access pg_database, and get a snapshot.
 	 * We don't have a use for the snapshot itself, but we're interested in
 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
 	 * for anything that reads heap pages, because HOT may decide to prune
 	 * them even if the process doesn't attempt to modify any tuples.)
 	 */
-	StartTransactionCommand();
 	(void) GetTransactionSnapshot();
 
 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -121,34 +124,17 @@ get_subscription_list(void)
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 	{
 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
-		Subscription *sub;
-		MemoryContext oldcxt;
 
-		/*
-		 * Allocate our results in the caller's context, not the
-		 * transaction's. We do this inside the loop, and restore the original
-		 * context at the end, so that leaky things like heap_getnext() are
-		 * not called in a potentially long-lived context.
-		 */
-		oldcxt = MemoryContextSwitchTo(resultcxt);
-
-		sub = (Subscription *) palloc0(sizeof(Subscription));
-		sub->oid = HeapTupleGetOid(tup);
-		sub->dbid = subform->subdbid;
-		sub->owner = subform->subowner;
-		sub->enabled = subform->subenabled;
-		sub->name = pstrdup(NameStr(subform->subname));
-		/* We don't fill fields we are not interested in. */
-
-		res = lappend(res, sub);
-		MemoryContextSwitchTo(oldcxt);
+		/* We only care about enabled subscriptions. */
+		if (!subform->subenabled)
+			continue;
+
+		res = lappend_oid(res, HeapTupleGetOid(tup));
 	}
 
 	heap_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
 	return res;
 }
 
@@ -250,23 +236,68 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 }
 
 /*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+	int			i;
+	List	   *res = NIL;
+
+	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+	/* Search for attached worker for a given subscription id. */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid && (!only_running || w->proc))
+			res = lappend(res, w);
+	}
+
+	return res;
+}
+
+/*
+ * Start new logical replication background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
 	int			i;
 	int			slot = 0;
-	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
+	List	   *subworkers;
+	ListCell   *lc;
 	TimestampTz now;
+	int			nsyncworkers = 0;
+	Subscription *sub;
+	LogicalRepWorker *worker = NULL;
 
 	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+			(errmsg("starting logical replication worker for subscription %u",
+					subid)));
+
+	/* Block any concurrent DDL on the subscription. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/*
+	 * Subscription might have been dropped in meantime, make sure our cache
+	 * is up to date.
+	 */
+	AcceptInvalidationMessages();
+
+	/* Get info about subscription. */
+	sub = GetSubscription(subid, true);
+	if (!sub)
+	{
+		ereport(DEBUG1,
+				(errmsg("subscription %u not found, not starting worker for it",
+						subid)));
+		return;
+	}
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -294,7 +325,14 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (w->relid != InvalidOid)
+			nsyncworkers ++;
+	}
+	list_free(subworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -340,6 +378,7 @@ retry:
 	if (nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		return;
 	}
 
@@ -350,6 +389,7 @@ retry:
 	if (worker == NULL)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of logical replication worker slots"),
@@ -362,8 +402,8 @@ retry:
 	worker->in_use = true;
 	worker->generation++;
 	worker->proc = NULL;
-	worker->dbid = dbid;
-	worker->userid = userid;
+	worker->dbid = sub->dbid;
+	worker->userid = sub->owner;
 	worker->subid = subid;
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -374,8 +414,6 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
-	LWLockRelease(LogicalRepWorkerLock);
-
 	/* Register the new dynamic worker. */
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -394,8 +432,13 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
+	/* Try to register the worker and cleanup in case of failure. */
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of background worker slots"),
@@ -403,13 +446,24 @@ retry:
 		return;
 	}
 
+	/* Done with the worker array. */
+	LWLockRelease(LogicalRepWorkerLock);
+
 	/* Now wait until it attaches. */
 	WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+	/*
+	 * Worker either started or died, in any case we are done with the
+	 * subscription.
+	 */
+	UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 }
 
 /*
  * Stop the logical replication worker and wait until it detaches from the
  * slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -417,7 +471,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LogicalRepWorker *worker;
 	uint16		generation;
 
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	/* Exclusive is needed for logicalrep_worker_cleanup(). */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
 	worker = logicalrep_worker_find(subid, relid, false);
 
@@ -428,56 +483,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		return;
 	}
 
+	/* If there is worker but it's not running, clean it up. */
+	if (!worker->proc)
+		logicalrep_worker_cleanup(worker);
+
 	/*
 	 * Remember which generation was our worker so we can check if what we see
 	 * is still the same one.
 	 */
 	generation = worker->generation;
 
-	/*
-	 * If we found worker but it does not have proc set it is starting up,
-	 * wait for it to finish and then kill it.
-	 */
-	while (worker->in_use && !worker->proc)
-	{
-		int			rc;
-
-		LWLockRelease(LogicalRepWorkerLock);
-
-		/* Wait for signal. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   1000L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		/* emergency bailout if postmaster has died */
-		if (rc & WL_POSTMASTER_DEATH)
-			proc_exit(1);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Check worker status. */
-		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/*
-		 * Check whether the worker slot is no longer used, which would mean
-		 * that the worker has exited, or whether the worker generation is
-		 * different, meaning that a different worker has taken the slot.
-		 */
-		if (!worker->in_use || worker->generation != generation)
-		{
-			LWLockRelease(LogicalRepWorkerLock);
-			return;
-		}
-
-		/* Worker has assigned proc, so it has started. */
-		if (worker->proc)
-			break;
-	}
-
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
 	LWLockRelease(LogicalRepWorkerLock);
@@ -497,7 +512,10 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Wait for more work. */
+		/*
+		 * We need timeout because we generally don't get notified via latch
+		 * about the worker attach.
+		 */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -515,6 +533,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
+ * Request worker to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+	LogicalRepWorkerId *wid;
+	MemoryContext		oldctx;
+
+	/* Make sure we store the info in context which survives until commit. */
+	oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+	wid = palloc(sizeof(LogicalRepWorkerId));
+	wid->subid = subid;
+	wid->relid = relid;
+
+	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+	MemoryContextSwitchTo(oldctx);
+}
+
+/*
  * Wake up (using latch) the logical replication worker.
  */
 void
@@ -648,30 +687,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
 }
 
 /*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
-	int			i;
-	int			res = 0;
-
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
-	return res;
-}
-
-/*
  * ApplyLauncherShmemSize
  *		Compute space needed for replication launcher shared memory
  */
@@ -749,14 +764,41 @@ ApplyLauncherShmemInit(void)
 }
 
 /*
+ * XactManipulatesLogicalReplicationWorkers
+ *		Check whether current transaction has manipulated logical replication
+ *		workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+	return (on_commit_stop_workers != NIL);
+}
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-	if (isCommit && on_commit_launcher_wakeup)
-		ApplyLauncherWakeup();
+	ListCell *lc;
+
+	if (isCommit)
+	{
+		foreach (lc, on_commit_stop_workers)
+		{
+			LogicalRepWorkerId *wid = lfirst(lc);
+			logicalrep_worker_stop(wid->subid, wid->relid);
+		}
+
+		if (on_commit_launcher_wakeup)
+			ApplyLauncherWakeup();
+	}
 
+	/*
+	 * No need to pfree on_commit_stop_workers, it's been allocated in
+	 * transaction memory context which is going to be cleaned soon.
+	 */
+	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
 }
 
@@ -814,8 +856,6 @@ ApplyLauncherMain(Datum main_arg)
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -827,41 +867,38 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
+			/*
+			 * Start new transaction so that we can take locks and snapshots.
+			 *
+			 * Any allocations will also be made inside the transaction memory
+			 * context.
+			 */
+			StartTransactionCommand();
+
+			/* Search for subscriptions to start. */
 			sublist = get_subscription_list();
 
-			/* Start the missing workers for enabled subscriptions. */
+			/* Start the missing workers. */
 			foreach(lc, sublist)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
+				Oid	subid = lfirst_oid(lc);
 				LogicalRepWorker *w;
 
 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+				w = logicalrep_worker_find(subid, InvalidOid, false);
 				LWLockRelease(LogicalRepWorkerLock);
 
-				if (sub->enabled && w == NULL)
+				if (w == NULL)
 				{
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+					/* Start the worker. */
+					logicalrep_worker_launch(subid, InvalidOid);
 				}
 			}
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			CommitTransactionCommand();
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ef12df..11f4977 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -215,7 +215,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
  * Returns false if the apply worker has disappeared or the table state has been
  * reset.
  */
-static bool
+static void
 wait_for_worker_state_change(char expected_state)
 {
 	int			rc;
@@ -232,10 +232,13 @@ wait_for_worker_state_change(char expected_state)
 										InvalidOid, false);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			return false;
+			ereport(FATAL,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("terminating logical replication synchronization "
+							"worker due to subscription apply worker exit")));
 
 		if (MyLogicalRepWorker->relstate == expected_state)
-			return true;
+			return;
 
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -247,8 +250,6 @@ wait_for_worker_state_change(char expected_state)
 
 		ResetLatch(MyLatch);
 	}
-
-	return false;
 }
 
 /*
@@ -285,11 +286,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		SetSubscriptionRelState(MyLogicalRepWorker->subid,
-								MyLogicalRepWorker->relid,
-								MyLogicalRepWorker->relstate,
-								MyLogicalRepWorker->relstate_lsn,
-								true);
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
@@ -332,6 +332,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 
+#define ensure_transaction() \
+	if (!started_tx) \
+	{\
+		StartTransactionCommand(); \
+		started_tx = true; \
+	}
+
 	Assert(!IsTransactionState());
 
 	/* We need up-to-date sync state info for subscription tables here. */
@@ -346,8 +353,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		list_free_deep(table_states);
 		table_states = NIL;
 
-		StartTransactionCommand();
-		started_tx = true;
+		ensure_transaction();
 
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -409,14 +415,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										rstate->relid, rstate->state,
-										rstate->lsn, true);
+
+				ensure_transaction();
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   rstate->relid, rstate->state,
+										   rstate->lsn);
 			}
 		}
 		else
@@ -435,13 +438,26 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				SpinLockRelease(&syncworker->relmutex);
 			}
 			else
+			{
+				List	   *subworkers;
+				ListCell   *lc;
 
 				/*
 				 * If there is no sync worker for this table yet, count
 				 * running sync workers for this subscription, while we have
 				 * the lock, for later.
 				 */
-				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				subworkers =
+					logicalrep_sub_workers_find(MyLogicalRepWorker->subid,
+												false);
+				foreach (lc, subworkers)
+				{
+					LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+					if (w->relid != InvalidOid)
+						nsyncworkers ++;
+				}
+				list_free(subworkers);
+			}
 			LWLockRelease(LogicalRepWorkerLock);
 
 			/*
@@ -467,11 +483,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * Enter busy loop and wait for synchronization worker to
 				 * reach expected state (or die trying).
 				 */
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
+				ensure_transaction();
 				wait_for_relation_state_change(rstate->relid,
 											   SUBREL_STATE_SYNCDONE);
 			}
@@ -493,10 +505,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					TimestampDifferenceExceeds(hentry->last_start_time, now,
 											   wal_retrieve_retry_interval))
 				{
-					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-											 MySubscription->oid,
-											 MySubscription->name,
-											 MyLogicalRepWorker->userid,
+					ensure_transaction();
+					logicalrep_worker_launch(MySubscription->oid,
 											 rstate->relid);
 					hentry->last_start_time = now;
 				}
@@ -798,6 +808,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 									   MyLogicalRepWorker->relid,
 									   &relstate_lsn, true);
+	if (relstate == SUBREL_STATE_UNKNOWN)
+	{
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", "
+						"table \"%s\" will stop because the table is no longer subscribed",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid))));
+		proc_exit(0);
+	}
 	CommitTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -844,11 +863,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
-										MyLogicalRepWorker->relstate,
-										MyLogicalRepWorker->relstate_lsn,
-										true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   MyLogicalRepWorker->relstate,
+										   MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -933,11 +951,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
 					 */
-					SetSubscriptionRelState(MyLogicalRepWorker->subid,
-											MyLogicalRepWorker->relid,
-											SUBREL_STATE_SYNCDONE,
-											*origin_startpos,
-											true);
+					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   SUBREL_STATE_SYNCDONE,
+											   *origin_startpos);
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 898c497..085dd8c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1516,24 +1516,31 @@ ApplyWorkerMain(Datum main_arg)
 										 ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	if (!MySubscription)
+	{
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription %u will "
+						"stop because the subscription was removed",
+						MyLogicalRepWorker->subid)));
+		proc_exit(0);
+	}
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
 
-	/* Setup synchronous commit according to the user's wishes */
-	SetConfigOption("synchronous_commit", MySubscription->synccommit,
-					PGC_BACKEND, PGC_S_OVERRIDE);
-
 	if (!MySubscription->enabled)
 	{
 		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will not "
-						"start because the subscription was disabled during startup",
+				(errmsg("logical replication apply worker for subscription \"%s\" will "
+						"stop because the subscription was disabled",
 						MySubscription->name)));
-
 		proc_exit(0);
 	}
 
+	/* Setup synchronous commit according to the user's wishes */
+	SetConfigOption("synchronous_commit", MySubscription->synccommit,
+					PGC_BACKEND, PGC_S_OVERRIDE);
+
 	/* Keep us informed about subscription changes. */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
 		case AUTHNAME:
 		case AUTHOID:
 		case AUTHMEMMEMROLE:
+		case SUBSCRIPTIONOID:
+		case SUBSCRIPTIONNAME:
 
 			/*
-			 * Protect authentication lookups occurring before relcache has
-			 * collected entries for shared indexes.
+			 * Protect authentication and subscription lookups occurring
+			 * before relcache has collected entries for shared indexes.
 			 */
 			if (!criticalSharedRelcachesBuilt)
 				return false;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d32..78016c4 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,13 +71,13 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 					   bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-						 Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-extern int	logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
 
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to