From b7a9ab4ed1e6da6cebf9d8fa48f834369b583f14 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 9 Jul 2023 03:56:16 +0000
Subject: [PATCH v6] Synchronize logical replication slots from primary to
 standby

---
 doc/src/sgml/config.sgml                      |  34 ++
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/bgworker.c             |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  95 ++++
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    | 263 +++++++----
 src/backend/replication/logical/meson.build   |   1 +
 .../replication/logical/reorderbuffer.c       |  86 ++++
 src/backend/replication/logical/slotsync.c    | 413 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  13 +-
 src/backend/replication/logical/worker.c      |   3 +-
 src/backend/replication/repl_gram.y           |  32 +-
 src/backend/replication/repl_scanner.l        |   2 +
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           | 195 +++++++++
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  26 ++
 src/backend/utils/misc/postgresql.conf.sample |   3 +
 src/include/commands/subscriptioncmds.h       |   3 +
 src/include/nodes/replnodes.h                 |   9 +
 src/include/replication/logicallauncher.h     |   2 +
 src/include/replication/logicalworker.h       |   9 +
 src/include/replication/slot.h                |   5 +-
 src/include/replication/walreceiver.h         |  20 +
 src/include/replication/worker_internal.h     |   8 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/037_slot_sync.pl          | 130 ++++++
 27 files changed, 1270 insertions(+), 94 deletions(-)
 create mode 100644 src/backend/replication/logical/slotsync.c
 create mode 100644 src/test/recovery/t/037_slot_sync.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c9fa6cd9c7..756db04886 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4450,6 +4450,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical replication slots that logical replication waits for.
+        If a logical replication connection is meant to switch to a physical
+        standby after the standby is promoted, the physical replication slot
+        for the standby should be listed here.  This ensures that logical
+        replication is not ahead of the physical standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -4598,6 +4615,23 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronize_slot_names" xreflabel="synchronize_slot_names">
+      <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>synchronize_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a list of logical replication slots that a physical standby
+        should synchronize from the primary server.  This is necessary to be
+        able to retarget those logical replication connections to this standby
+        if it gets promoted.  Specify <literal>*</literal> to synchronize all
+        logical replication slots.  The default is empty.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d4e798baeb..42e9b1056c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -993,7 +993,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				logicalrep_worker_stop(MyDatabaseId, sub->oid, relid);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -1591,7 +1591,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
+		logicalrep_worker_stop(w->dbid, w->subid, w->relid);
 	}
 	list_free(subworkers);
 
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5b4bd71694..f2f4475c3b 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
 	},
+	{
+		"ReplSlotSyncMain", ReplSlotSyncMain
+	},
 	{
 		"ParallelApplyWorkerMain", ParallelApplyWorkerMain
 	}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index dc9c5c82d9..e7ecfa947b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -34,6 +34,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
+#include "utils/varlena.h"
 
 PG_MODULE_MAGIC;
 
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 									  TimeLineID *primary_tli);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names);
 static int	libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 											 TimeLineID tli, char **filename,
@@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_receive = libpqrcv_receive,
 	.walrcv_send = libpqrcv_send,
 	.walrcv_create_slot = libpqrcv_create_slot,
+	.walrcv_list_slots = libpqrcv_list_slots,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
 	.walrcv_disconnect = libpqrcv_disconnect
@@ -409,6 +412,98 @@ libpqrcv_server_version(WalReceiverConn *conn)
 	return PQserverVersion(conn->streamConn);
 }
 
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names)
+{
+	PGresult   *res;
+	List	   *slotlist = NIL;
+	int			ntuples;
+	StringInfoData s;
+	WalRecvReplicationSlotData *slot_data;
+
+	initStringInfo(&s);
+	appendStringInfoString(&s, "LIST_SLOTS");
+
+	if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		appendStringInfoChar(&s, ' ');
+		rawname = pstrdup(slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_identifier(lfirst(lc)));
+		}
+	}
+
+	res = libpqrcv_PQexec(conn->streamConn, s.data);
+	pfree(s.data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not receive list of slots the primary server: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	}
+	if (PQnfields(res) < 10)
+	{
+		int			nfields = PQnfields(res);
+
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("invalid response from primary server"),
+				 errdetail("Could not get list of slots: got %d fields, expected %d or more fields.",
+						   nfields, 10)));
+	}
+
+	ntuples = PQntuples(res);
+	for (int i = 0; i < ntuples; i++)
+	{
+		char	   *slot_type;
+
+		slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+		namestrcpy(&slot_data->persistent_data.name, PQgetvalue(res, i, 0));
+		if (!PQgetisnull(res, i, 1))
+			namestrcpy(&slot_data->persistent_data.plugin, PQgetvalue(res, i, 1));
+		slot_type = PQgetvalue(res, i, 2);
+		if (!PQgetisnull(res, i, 3))
+			slot_data->persistent_data.database = atooid(PQgetvalue(res, i, 3));
+		if (strcmp(slot_type, "physical") == 0)
+		{
+			if (OidIsValid(slot_data->persistent_data.database))
+				elog(ERROR, "unexpected physical replication slot with database set");
+		}
+		if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+			slot_data->persistent_data.persistency = RS_TEMPORARY;
+		else
+			slot_data->persistent_data.persistency = RS_PERSISTENT;
+		if (!PQgetisnull(res, i, 6))
+			slot_data->persistent_data.xmin = atooid(PQgetvalue(res, i, 6));
+		if (!PQgetisnull(res, i, 7))
+			slot_data->persistent_data.catalog_xmin = atooid(PQgetvalue(res, i, 7));
+		if (!PQgetisnull(res, i, 8))
+			slot_data->persistent_data.restart_lsn = strtou64(PQgetvalue(res, i, 8), NULL, 10);
+		if (!PQgetisnull(res, i, 9))
+			slot_data->persistent_data.confirmed_flush = strtou64(PQgetvalue(res, i, 9), NULL, 10);
+
+		slot_data->last_sync_time = 0;
+		slotlist = lappend(slotlist, slot_data);
+	}
+
+	PQclear(res);
+
+	return slotlist;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2dc25e37bb..ba03eeff1c 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -25,6 +25,7 @@ OBJS = \
 	proto.o \
 	relation.o \
 	reorderbuffer.o \
+	slotsync.o \
 	snapbuild.o \
 	tablesync.o \
 	worker.o
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d863..640f7647cc 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
@@ -246,7 +247,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * We are only interested in the leader apply worker or table sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
 {
 	int			i;
 	LogicalRepWorker *res = NULL;
@@ -262,8 +263,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		if (isParallelApplyWorker(w))
 			continue;
 
-		if (w->in_use && w->subid == subid && w->relid == relid &&
-			(!only_running || w->proc))
+		if (w->in_use && w->dbid == dbid && w->subid == subid &&
+			w->relid == relid && (!only_running || w->proc))
 		{
 			res = w;
 			break;
@@ -320,9 +321,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	/* Sanity check - tablesync worker cannot be a subworker */
 	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
 
-	ereport(DEBUG1,
-			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
-							 subname)));
+	if (OidIsValid(subid))
+		ereport(DEBUG1,
+				(errmsg_internal("starting logical replication worker for subscription \"%s\"",
+								 subname)));
+	else
+		ereport(DEBUG1,
+				(errmsg_internal("starting replication slot synchronization worker")));
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -359,7 +364,9 @@ retry:
 	 * reason we do this is because if some worker failed to start up and its
 	 * parent has crashed while waiting, the in_use state was never cleared.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL ||
+		(OidIsValid(relid) &&
+		 nsyncworkers >= max_sync_workers_per_subscription))
 	{
 		bool		did_cleanup = false;
 
@@ -455,15 +462,20 @@ retry:
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	if (!OidIsValid(subid))
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
+	else if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
 	else
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 
-	if (OidIsValid(relid))
+	if (!OidIsValid(subid))
+		snprintf(bgw.bgw_name, BGW_MAXLEN,
+				 "replication slot synchronization worker");
+	else if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication worker for subscription %u sync %u", subid, relid);
 	else if (is_parallel_apply_worker)
@@ -591,13 +603,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, false);
+	worker = logicalrep_worker_find(dbid, subid, relid, false);
 
 	if (worker)
 	{
@@ -658,13 +670,13 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, true);
+	worker = logicalrep_worker_find(dbid, subid, relid, true);
 
 	if (worker)
 		logicalrep_worker_wakeup_ptr(worker);
@@ -909,7 +921,7 @@ ApplyLauncherRegister(void)
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 	snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1092,6 +1104,157 @@ ApplyLauncherWakeup(void)
 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+static void
+ApplyLauncherStartSlotSync(long *wait_time)
+{
+	WalReceiverConn *wrconn;
+	char	   *err;
+	List	   *slots;
+	ListCell   *lc;
+	MemoryContext tmpctx;
+	MemoryContext oldctx;
+
+	if (strcmp(synchronize_slot_names, "") == 0)
+		return;
+
+	wrconn = walrcv_connect(PrimaryConnInfo, false, false,
+							"Logical Replication Launcher", &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	/* Use temporary context for the slot list and worker info. */
+	tmpctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher slot sync ctx",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(tmpctx);
+
+	slots = walrcv_list_slots(wrconn, synchronize_slot_names);
+
+	foreach(lc, slots)
+	{
+		WalRecvReplicationSlotData *slot_data = lfirst(lc);
+		LogicalRepWorker *w;
+		TimestampTz last_sync;
+		TimestampTz	now;
+		long		elapsed;
+
+		if (!OidIsValid(slot_data->persistent_data.database))
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(slot_data->persistent_data.database, InvalidOid,
+								   InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.
+		 */
+		last_sync = slot_data->last_sync_time;
+		now = GetCurrentTimestamp();
+		if (last_sync == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_sync, now)) >= wal_retrieve_retry_interval)
+		{
+			slot_data->last_sync_time = now;
+			logicalrep_worker_launch(slot_data->persistent_data.database,
+									 InvalidOid, NULL,
+									 BOOTSTRAP_SUPERUSERID, InvalidOid,
+									 DSM_HANDLE_INVALID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(tmpctx);
+
+	walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(long *wait_time)
+{
+	List	   *sublist;
+	ListCell   *lc;
+	MemoryContext subctx;
+	MemoryContext oldctx;
+
+	/* Use temporary context to avoid leaking memory across cycles. */
+	subctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher sublist",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(subctx);
+
+	/* Start the missing workers for enabled subscriptions. */
+	sublist = get_subscription_list();
+	foreach(lc, sublist)
+	{
+		Subscription *sub = (Subscription *) lfirst(lc);
+		LogicalRepWorker *w;
+		TimestampTz last_start;
+		TimestampTz now;
+		long		elapsed;
+
+		if (!sub->enabled)
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each subscription's apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.  In cases
+		 * where a restart is expected (e.g., subscription parameter
+		 * changes), another process should remove the last-start entry
+		 * for the subscription so that the worker can be restarted
+		 * without waiting for wal_retrieve_retry_interval to elapse.
+		 */
+		last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+		now = GetCurrentTimestamp();
+		if (last_start == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
+		{
+			ApplyLauncherSetWorkerStartTime(sub->oid, now);
+			logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+									 sub->owner, InvalidOid,
+									 DSM_HANDLE_INVALID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -1117,78 +1280,20 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	load_file("libpqwalreceiver", false);
+
 	/* Enter main loop */
 	for (;;)
 	{
 		int			rc;
-		List	   *sublist;
-		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Use temporary context to avoid leaking memory across cycles. */
-		subctx = AllocSetContextCreate(TopMemoryContext,
-									   "Logical Replication Launcher sublist",
-									   ALLOCSET_DEFAULT_SIZES);
-		oldctx = MemoryContextSwitchTo(subctx);
-
-		/* Start any missing workers for enabled subscriptions. */
-		sublist = get_subscription_list();
-		foreach(lc, sublist)
-		{
-			Subscription *sub = (Subscription *) lfirst(lc);
-			LogicalRepWorker *w;
-			TimestampTz last_start;
-			TimestampTz now;
-			long		elapsed;
-
-			if (!sub->enabled)
-				continue;
-
-			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-			LWLockRelease(LogicalRepWorkerLock);
-
-			if (w != NULL)
-				continue;		/* worker is running already */
-
-			/*
-			 * If the worker is eligible to start now, launch it.  Otherwise,
-			 * adjust wait_time so that we'll wake up as soon as it can be
-			 * started.
-			 *
-			 * Each subscription's apply worker can only be restarted once per
-			 * wal_retrieve_retry_interval, so that errors do not cause us to
-			 * repeatedly restart the worker as fast as possible.  In cases
-			 * where a restart is expected (e.g., subscription parameter
-			 * changes), another process should remove the last-start entry
-			 * for the subscription so that the worker can be restarted
-			 * without waiting for wal_retrieve_retry_interval to elapse.
-			 */
-			last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
-			now = GetCurrentTimestamp();
-			if (last_start == 0 ||
-				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
-			{
-				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-										 sub->owner, InvalidOid,
-										 DSM_HANDLE_INVALID);
-			}
-			else
-			{
-				wait_time = Min(wait_time,
-								wal_retrieve_retry_interval - elapsed);
-			}
-		}
-
-		/* Switch back to original memory context. */
-		MemoryContextSwitchTo(oldctx);
-		/* Clean the temporary memory. */
-		MemoryContextDelete(subctx);
+		if (!RecoveryInProgress())
+			ApplyLauncherStartSubs(&wait_time);
+		else
+			ApplyLauncherStartSlotSync(&wait_time);
 
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index d48cd4c590..9e52ec421f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -11,6 +11,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'slotsync.c',
   'snapbuild.c',
   'tablesync.c',
   'worker.c',
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..98c93712ef 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -95,11 +95,14 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "replication/logical.h"
+#include "replication/logicalworker.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -107,6 +110,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
+#include "utils/varlena.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -2053,6 +2057,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	}
 }
 
+static void
+wait_for_standby_confirmation(XLogRecPtr commit_lsn)
+{
+	char	   *rawname;
+	List	   *namelist;
+	ListCell   *lc;
+	XLogRecPtr	flush_pos = InvalidXLogRecPtr;
+
+	if (strcmp(standby_slot_names, "") == 0)
+		return;
+
+	rawname = pstrdup(standby_slot_names);
+	SplitIdentifierString(rawname, ',', &namelist);
+
+	while (true)
+	{
+		int			wait_slots_remaining;
+		XLogRecPtr	oldest_flush_pos = InvalidXLogRecPtr;
+		int			rc;
+
+		wait_slots_remaining = list_length(namelist);
+
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		for (int i = 0; i < max_replication_slots; i++)
+		{
+			ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+			bool		inlist;
+
+			if (!s->in_use)
+				continue;
+
+			inlist = false;
+			foreach (lc, namelist)
+			{
+				char *name = lfirst(lc);
+				if (strcmp(name, NameStr(s->data.name)) == 0)
+				{
+					inlist = true;
+					break;
+				}
+			}
+			if (!inlist)
+				continue;
+
+			SpinLockAcquire(&s->mutex);
+
+			if (s->data.database == InvalidOid)
+				/* Physical slots advance restart_lsn on flush and ignore confirmed_flush_lsn */
+				flush_pos = s->data.restart_lsn;
+			else
+				/* For logical slots we must wait for commit and flush */
+				flush_pos = s->data.confirmed_flush;
+
+			SpinLockRelease(&s->mutex);
+
+			/* We want to find out the min(flush pos) over all named slots */
+			if (oldest_flush_pos == InvalidXLogRecPtr
+				|| oldest_flush_pos > flush_pos)
+				oldest_flush_pos = flush_pos;
+
+			if (flush_pos >= commit_lsn && wait_slots_remaining > 0)
+				wait_slots_remaining --;
+		}
+		LWLockRelease(ReplicationSlotControlLock);
+
+		if (wait_slots_remaining == 0)
+			return;
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   1000L, PG_WAIT_EXTENSION);
+
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
 /*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
@@ -2502,6 +2585,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			 * Call either PREPARE (for two-phase transactions) or COMMIT (for
 			 * regular ones).
 			 */
+
+			wait_for_standby_confirmation(commit_lsn);
+
 			if (rbtxn_prepared(txn))
 				rb->prepare(rb, txn, commit_lsn);
 			else
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..529ddb21ae
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,413 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *	   PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+char	   *synchronize_slot_names;
+char	   *standby_slot_names;
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+							  XLogRecPtr min_lsn)
+{
+	WalRcvExecResult *res;
+	TupleTableSlot *slot;
+	Oid			slotRow[1] = {LSNOID};
+	StringInfoData cmd;
+	bool		isnull;
+	XLogRecPtr	restart_lsn;
+
+	for (;;)
+	{
+		int			rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT restart_lsn"
+						 "  FROM pg_catalog.pg_replication_slots"
+						 " WHERE slot_name = %s",
+						 quote_literal_cstr(slot_name));
+		res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
+							slot_name, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+			ereport(ERROR,
+					(errmsg("slot \"%s\" disapeared from provider",
+							slot_name)));
+
+		restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+		walrcv_clear_result(res);
+
+		if (restart_lsn >= min_lsn)
+			break;
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+					 char *plugin_name, XLogRecPtr target_lsn)
+{
+	bool		found = false;
+	XLogRecPtr	endlsn;
+
+	/* Search for the named slot and mark it active if we find it. */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (strcmp(NameStr(s->data.name), slot_name) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	StartTransactionCommand();
+
+	/* Already existing slot, acquire */
+	if (found)
+	{
+		ReplicationSlotAcquire(slot_name, true);
+
+		if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+		{
+			elog(DEBUG1,
+				 "not synchronizing slot %s; synchronization would move it backward",
+				 slot_name);
+
+			ReplicationSlotRelease();
+			CommitTransactionCommand();
+			return;
+		}
+	}
+	/* Otherwise create the slot first. */
+	else
+	{
+		TransactionId xmin_horizon = InvalidTransactionId;
+		ReplicationSlot *slot;
+
+		ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false);
+		slot = MyReplicationSlot;
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.database = get_database_oid(database, false);
+		namestrcpy(&slot->data.plugin, plugin_name);
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotReserveWal();
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+		slot->effective_catalog_xmin = xmin_horizon;
+		slot->data.catalog_xmin = xmin_horizon;
+		ReplicationSlotsComputeRequiredXmin(true);
+		LWLockRelease(ProcArrayLock);
+
+		if (target_lsn < MyReplicationSlot->data.restart_lsn)
+		{
+			ereport(LOG,
+					errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass local slot LSN (%X/%X)",
+						   slot_name,
+						   LSN_FORMAT_ARGS(target_lsn), LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+			wait_for_primary_slot_catchup(wrconn, slot_name,
+										  MyReplicationSlot->data.restart_lsn);
+		}
+
+		ReplicationSlotPersist();
+	}
+
+	endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+	elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+		 slot_name, LSN_FORMAT_ARGS(endlsn));
+
+	ReplicationSlotRelease();
+	CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+	WalRcvExecResult *res;
+	WalReceiverConn *wrconn = NULL;
+	TupleTableSlot *slot;
+	Oid			slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+	StringInfoData s;
+	char	   *database;
+	char	   *err;
+	MemoryContext oldctx = CurrentMemoryContext;
+
+	if (!WalRcv)
+		return;
+
+	/* syscache access needs a transaction env. */
+	StartTransactionCommand();
+	/* make dbname live outside TX context */
+	MemoryContextSwitchTo(oldctx);
+
+	database = get_database_name(MyDatabaseId);
+	initStringInfo(&s);
+	appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+	wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err);
+
+	if (wrconn == NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	resetStringInfo(&s);
+	appendStringInfo(&s,
+					 "SELECT slot_name, plugin, confirmed_flush_lsn"
+					 "  FROM pg_catalog.pg_replication_slots"
+					 " WHERE database = %s",
+					 quote_literal_cstr(database));
+	if (strcmp(synchronize_slot_names, "") != 0 && strcmp(synchronize_slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		rawname = pstrdup(synchronize_slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+
+		appendStringInfoString(&s, " AND slot_name IN (");
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_literal_cstr(lfirst(lc)));
+		}
+		appendStringInfoChar(&s, ')');
+	}
+
+	res = walrcv_exec(wrconn, s.data, 3, slotRow);
+	pfree(s.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch slot info from primary: %s",
+						res->err)));
+
+	CommitTransactionCommand();
+	/* CommitTransactionCommand switches to TopMemoryContext */
+	MemoryContextSwitchTo(oldctx);
+
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *slot_name;
+		char	   *plugin_name;
+		XLogRecPtr	confirmed_flush_lsn;
+		bool		isnull;
+
+		slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		plugin_name = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+							 confirmed_flush_lsn);
+
+		ExecClearTuple(slot);
+	}
+
+	walrcv_clear_result(res);
+	pfree(database);
+
+	walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+
+	/* Attach to slot */
+	logicalrep_worker_attach(worker_slot);
+
+	/* Establish signal handlers. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
+	/* Connect to our database. */
+	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+											  MyLogicalRepWorker->userid,
+											  0);
+
+	StartTransactionCommand();
+	ereport(LOG,
+			(errmsg("replication slot synchronization worker for database \"%s\" has started",
+					get_database_name(MyLogicalRepWorker->dbid))));
+	CommitTransactionCommand();
+
+	/* Main wait loop. */
+	for (;;)
+	{
+		int			rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (!RecoveryInProgress())
+			return;
+
+		if (strcmp(synchronize_slot_names, "") == 0)
+			return;
+
+		synchronize_slots();
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
+
+/*
+ * Routines for handling the GUC variable(s)
+ */
+
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+	/* Special handling for "*" which means all. */
+	if (strcmp(*newval, "*") == 0)
+	{
+		return true;
+	}
+	else
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		/* Need a modifiable copy of string */
+		rawname = pstrdup(*newval);
+
+		/* Parse string into list of identifiers */
+		if (!SplitIdentifierString(rawname, ',', &namelist))
+		{
+			/* syntax error in name list */
+			GUC_check_errdetail("List syntax is invalid.");
+			pfree(rawname);
+			list_free(namelist);
+			return false;
+		}
+
+		foreach(lc, namelist)
+		{
+			char	   *curname = (char *) lfirst(lc);
+
+			ReplicationSlotValidateName(curname, ERROR);
+		}
+
+		pfree(rawname);
+		list_free(namelist);
+	}
+
+	return true;
+}
+
+
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+	char	   *rawname;
+	List	   *namelist;
+	ListCell   *lc;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawname, ',', &namelist))
+	{
+		/* syntax error in name list */
+		GUC_check_errdetail("List syntax is invalid.");
+		pfree(rawname);
+		list_free(namelist);
+		return false;
+	}
+
+	foreach(lc, namelist)
+	{
+		char	   *curname = (char *) lfirst(lc);
+
+		ReplicationSlotValidateName(curname, ERROR);
+	}
+
+	pfree(rawname);
+	list_free(namelist);
+
+	return true;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..5a98d2b699 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -156,7 +157,8 @@ finish_sync_worker(void)
 	CommitTransactionCommand();
 
 	/* Find the leader apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+	logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+							 MyLogicalRepWorker->subid, InvalidOid);
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -196,7 +198,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 
 		/* Check if the sync worker is still running and bail if not. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid, relid,
 										false);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
@@ -243,7 +246,8 @@ wait_for_worker_state_change(char expected_state)
 		 * waiting.
 		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid,
 										InvalidOid, false);
 		if (worker && worker->proc)
 			logicalrep_worker_wakeup_ptr(worker);
@@ -509,7 +513,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 */
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+			syncworker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+												MyLogicalRepWorker->subid,
 												rstate->relid, false);
 
 			if (syncworker)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0ee764d68f..651f39c616 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1603,7 +1603,8 @@ apply_handle_stream_start(StringInfo s)
 				 * Signal the leader apply worker, as it may be waiting for
 				 * us.
 				 */
-				logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+				logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+										 MyLogicalRepWorker->subid, InvalidOid);
 			}
 
 			parallel_stream_nchanges = 0;
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..12a4b74368 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,11 +76,12 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_LIST_SLOTS
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				read_replication_slot timeline_history show
+				read_replication_slot timeline_history show list_slots
 %type <list>	generic_option_list
 %type <defelt>	generic_option
 %type <uintval>	opt_timeline
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <list>	slot_name_list slot_name_list_opt
 
 %%
 
@@ -114,6 +116,7 @@ command:
 			| read_replication_slot
 			| timeline_history
 			| show
+			| list_slots
 			;
 
 /*
@@ -126,6 +129,33 @@ identify_system:
 				}
 			;
 
+slot_name_list:
+			IDENT
+				{
+					$$ = list_make1($1);
+				}
+			| slot_name_list ',' IDENT
+				{
+					$$ = lappend($1, $3);
+				}
+
+slot_name_list_opt:
+			slot_name_list			{ $$ = $1; }
+			| /* EMPTY */			{ $$ = NIL; }
+		;
+
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+			K_LIST_SLOTS slot_name_list_opt
+				{
+					ListSlotsCmd *cmd = makeNode(ListSlotsCmd);
+					cmd->slot_names = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
 /*
  * READ_REPLICATION_SLOT %s
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 1cc7fb858c..11064feb86 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
+LIST_SLOTS			{ return K_LIST_SLOTS; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
@@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void)
 		case K_READ_REPLICATION_SLOT:
 		case K_TIMELINE_HISTORY:
 		case K_SHOW:
+		case K_LIST_SLOTS:
 			/* Yes; push back the first token so we can parse later. */
 			repl_pushed_back_token = first_token;
 			return true;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..83ada6db6a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  */
-static XLogRecPtr
+XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cedadb0036..f905ae5722 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -473,6 +473,194 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+	return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(ListSlotsCmd *cmd)
+{
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	NameData   *slot_names;
+	int			numslot_names;
+
+	numslot_names = list_length(cmd->slot_names);
+	if (numslot_names)
+	{
+		ListCell   *lc;
+		int			i = 0;
+
+		slot_names = palloc(numslot_names * sizeof(NameData));
+		foreach(lc, cmd->slot_names)
+		{
+			char	   *slot_name = lfirst(lc);
+
+			ReplicationSlotValidateName(slot_name, ERROR);
+			namestrcpy(&slot_names[i++], slot_name);
+		}
+
+		qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+
+	/* need a tuple descriptor representing four columns */
+	tupdesc = CreateTemplateTupleDesc(10);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+							  TEXTOID, -1, 0);
+
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int slotno = 0; slotno < max_replication_slots; slotno++)
+	{
+		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+		char		restart_lsn_str[MAXFNAMELEN];
+		char		confirmed_flush_lsn_str[MAXFNAMELEN];
+		Datum		values[10];
+		bool		nulls[10];
+
+		ReplicationSlotPersistency persistency;
+		TransactionId xmin;
+		TransactionId catalog_xmin;
+		XLogRecPtr	restart_lsn;
+		XLogRecPtr	confirmed_flush_lsn;
+		Oid			datoid;
+		NameData	slot_name;
+		NameData	plugin;
+		int			i;
+		int64		tmpbigint;
+
+		if (!slot->in_use)
+			continue;
+
+		SpinLockAcquire(&slot->mutex);
+
+		xmin = slot->data.xmin;
+		catalog_xmin = slot->data.catalog_xmin;
+		datoid = slot->data.database;
+		restart_lsn = slot->data.restart_lsn;
+		confirmed_flush_lsn = slot->data.confirmed_flush;
+		namestrcpy(&slot_name, NameStr(slot->data.name));
+		namestrcpy(&plugin, NameStr(slot->data.plugin));
+		persistency = slot->data.persistency;
+
+		SpinLockRelease(&slot->mutex);
+
+		if (numslot_names &&
+			!bsearch((void *) &slot_name, (void *) slot_names,
+					 numslot_names, sizeof(NameData), pg_qsort_namecmp))
+			continue;
+
+		memset(nulls, 0, sizeof(nulls));
+
+		i = 0;
+		values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+			values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+		if (datoid == InvalidOid)
+			values[i++] = CStringGetTextDatum("physical");
+		else
+			values[i++] = CStringGetTextDatum("logical");
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			tmpbigint = datoid;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			MemoryContext cur = CurrentMemoryContext;
+
+			/* syscache access needs a transaction env. */
+			StartTransactionCommand();
+			/* make dbname live outside TX context */
+			MemoryContextSwitchTo(cur);
+			values[i++] = CStringGetTextDatum(get_database_name(datoid));
+			CommitTransactionCommand();
+			/* CommitTransactionCommand switches to TopMemoryContext */
+			MemoryContextSwitchTo(cur);
+		}
+
+		values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0);
+
+		if (xmin != InvalidTransactionId)
+		{
+			tmpbigint = xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (catalog_xmin != InvalidTransactionId)
+		{
+			tmpbigint = catalog_xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (restart_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X",
+					 LSN_FORMAT_ARGS(restart_lsn));
+			values[i++] = CStringGetTextDatum(restart_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		if (confirmed_flush_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str),
+					 "%X/%X", LSN_FORMAT_ARGS(confirmed_flush_lsn));
+			values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		/* send it to dest */
+		do_tup_output(tstate, values, nulls);
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	end_tup_output(tstate);
+}
+
 /* Handle READ_REPLICATION_SLOT command */
 static void
 ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -1819,6 +2007,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ListSlotsCmd:
+			cmdtag = "LIST_SLOTS";
+			set_ps_display(cmdtag);
+			ListSlots((ListSlotsCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_StartReplicationCmd:
 			{
 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7af1a61f95..9560748407 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -53,6 +53,7 @@ WAIT_EVENT_LOGICAL_APPLY_MAIN	"LogicalApplyMain"	"Waiting in main loop of logica
 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN	"LogicalLauncherMain"	"Waiting in main loop of logical replication launcher process."
 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN	"LogicalParallelApplyMain"	"Waiting in main loop of logical replication parallel apply process."
 WAIT_EVENT_RECOVERY_WAL_STREAM	"RecoveryWalStream"	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
+WAIT_EVENT_REPL_SLOT_SYNC_MAIN	"ReplSlotSyncMain"	"Waiting in main loop of worker for synchronizing slots to a standby from primary."
 WAIT_EVENT_SYSLOGGER_MAIN	"SysLoggerMain"	"Waiting in main loop of syslogger process."
 WAIT_EVENT_WAL_RECEIVER_MAIN	"WalReceiverMain"	"Waiting in main loop of WAL receiver process."
 WAIT_EVENT_WAL_SENDER_MAIN	"WalSenderMain"	"Waiting in main loop of WAL sender process."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 59ab630ae4..4add70b54f 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -63,8 +63,12 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
@@ -4557,6 +4561,28 @@ struct config_string ConfigureNamesString[] =
 		check_io_direct, assign_io_direct, NULL
 	},
 
+	{
+		{"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the names of replication slots which to synchronize from primary to standby."),
+			gettext_noop("Value of \"*\" means all."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&synchronize_slot_names,
+		"",
+		check_synchronize_slot_names, NULL, NULL
+	},
+
+	{
+		{"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+			gettext_noop("List of physical slots that must confirm changes before changes are sent to logical replication consumers."),
+			NULL,
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&standby_slot_names,
+		"",
+		check_standby_slot_names, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e4c0269fa3..7ea4364790 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -329,6 +329,8 @@
 				# method to choose sync standbys, number of sync standbys,
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
+#standby_slot_names = '' # physical standby slot names that logical replication
+				# waits for.
 
 # - Standby Servers -
 
@@ -356,6 +358,7 @@
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
+#synchronize_slot_names = ''		# logical replication slots to sync to standby
 
 # - Subscribers -
 
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..0e77f9ee5c 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
 
 #include "catalog/objectaddress.h"
 #include "parser/parse_node.h"
+#include "replication/walreceiver.h"
 
 extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										bool isTopLevel);
@@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
 
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..980e0b2ee2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+	NodeTag		type;
+	List	   *slot_names;
+} ListSlotsCmd;
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index a07c9cb311..80fdbf9657 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -31,4 +31,6 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern PGDLLIMPORT char *PrimaryConnInfo;
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 39588da79f..6408753557 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,10 +14,16 @@
 
 #include <signal.h>
 
+#include "utils/guc.h"
+
+extern char *synchronize_slot_names;
+extern char *standby_slot_names;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
@@ -29,4 +35,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
 
 extern void AtEOXact_LogicalRepWorkers(bool isCommit);
 
+extern bool check_synchronize_slot_names(char **newval, void **extra, GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra, GucSource source);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..5dc2e0d30d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
-#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -238,7 +237,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
@@ -246,4 +244,7 @@ extern void CheckPointReplicationSlots(void);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
 #endif							/* SLOT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..9e9d64faf2 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -20,6 +20,7 @@
 #include "pgtime.h"
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/latch.h"
@@ -191,6 +192,17 @@ typedef struct
 	}			proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently same as ReplicationSlotPersistentData except last_sync_time
+ */
+typedef struct WalRecvReplicationSlotData
+{
+	ReplicationSlotPersistentData persistent_data;
+	TimestampTz last_sync_time;
+} WalRecvReplicationSlotData;
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli);
 
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char *slots);
+
 /*
  * walrcv_server_version_fn
  *
@@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
+	walrcv_list_slots_fn walrcv_list_slots;
 	walrcv_server_version_fn walrcv_server_version;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
@@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_slots(conn, slots) \
+	WalReceiverFunctions->walrcv_list_slots(conn, slots)
 #define walrcv_server_version(conn) \
 	WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781896..d42cff3f2c 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -65,7 +65,7 @@ typedef struct LogicalRepWorker
 	 * would be created for each transaction which will be deleted after the
 	 * transaction is finished.
 	 */
-	FileSet    *stream_fileset;
+	struct FileSet    *stream_fileset;
 
 	/*
 	 * PID of leader apply worker if this slot is used for a parallel apply
@@ -228,15 +228,15 @@ extern PGDLLIMPORT bool in_remote_transaction;
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 2008958010..b6fcc8704e 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -42,6 +42,7 @@ tests += {
       't/034_create_database.pl',
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
+      't/037_slot_sync.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/037_slot_sync.pl b/src/test/recovery/t/037_slot_sync.pl
new file mode 100644
index 0000000000..0520042d96
--- /dev/null
+++ b/src/test/recovery/t/037_slot_sync.pl
@@ -0,0 +1,130 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Check invalidation in the logfile
+sub check_for_invalidation
+{
+	my ($log_start, $test_name) = @_;
+
+	# message should be issued
+	ok( find_in_log(
+		$node_phys_standby,
+        "invalidating obsolete replication slot \"sub1\"", $log_start),
+        "sub1 slot invalidation is logged $test_name");
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my $res = $node_phys_standby->safe_psql(
+				'postgres', qq(
+				select bool_and(conflicting) from pg_replication_slots;));
+
+	is($res, 't',
+		"Logical slot is reported as conflicting");
+}
+
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'");
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]);
+
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
+
+$node_primary->wait_for_catchup($node_phys_standby->name);
+
+# Logical subscriber and physical replica are caught up at this point.
+
+# Drop the subscription so that catalog_xmin is unknown on the primary
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+
+# This should trigger a conflict as hot_standby_feedback is off on the standby
+$node_primary->safe_psql('postgres', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+# Ensure physical replay catches up
+$node_primary->wait_for_catchup($node_phys_standby);
+
+# Check invalidation in the logfile
+check_for_invalidation(1, 'with vacuum FULL on pg_class');
+
+# Check conflicting status in pg_replication_slots.
+check_slots_conflicting_status();
+
+done_testing();
-- 
2.34.1

