On Fri, Jul 21, 2023 at 5:16 PM shveta malik <shveta.ma...@gmail.com> wrote:
>
> Thanks Bharat for letting us know. It is okay to split the patch, it
> may definitely help to understand the modules better but shall we take
> a step back and try to reevaluate the design first before moving to
> other tasks?

Agree that design comes first. FWIW, I'm attaching the v9 patch set
that I have with me. It can't be a perfect patch set unless the design
is finalized.

> I analyzed more on the issues stated in [1] for replacing LIST_SLOTS
> with SELECT query. On rethinking, it might not be a good idea to
> replace this cmd with SELECT in Launcher code-path

I think there are open fundamental design aspects, before optimizing
LIST_SLOTS, see below. I'm sure we can come back to this later.

> Secondly, I was thinking if the design proposed in the patch is the
> best one. No doubt, it is the most simplistic design and thus may
> .......... Any feedback is appreciated.

Here are my thoughts about this feature:

Current design:

1. On primary, never allow walsenders associated with logical
replication slots to go ahead of physical standbys that are candidates
for future primary after failover. This enables subscribers to connect
to new primary after failover.
2. On all candidate standbys, periodically sync logical slots from
primary (creating the slots if necessary) with one slot sync worker
per logical slot.

Important considerations:

1. Does this design guarantee the row versions required by subscribers
aren't removed on candidate standbys as raised here -
https://www.postgresql.org/message-id/20220218222319.yozkbhren7vkjbi5%40alap3.anarazel.de?

It seems safe with logical decoding on standbys feature. Also, a
test-case from upthread is already in patch sets (in v9 too)
https://www.postgresql.org/message-id/CAAaqYe9FdKODa1a9n%3Dqj%2Bw3NiB9gkwvhRHhcJNginuYYRCnLrg%40mail.gmail.com.
However, we need to verify the use cases extensively.

2. All candidate standbys will start one slot sync worker per logical
slot which might not be scalable. Is having one (or a few more - not
necessarily one for each logical slot) worker for all logical slots
enough?

It seems safe to have one worker for all logical slots - it's not a
problem even if the worker takes a bit of time to get to sync a
logical slot on a candidate standby, because the standby is ensured to
retain all the WAL and row versions required to decode and send to the
logical slots.

3. Indefinite waiting of logical walsenders for candidate standbys may
not be a good idea. Is having a timeout for logical walsenders a good
idea?

A problem with timeout is that it can make logical slots unusable
after failover.

4. All candidate standbys retain WAL required by logical slots. Amount
of WAL retained may be huge if there's a replication lag with logical
replication subscribers.

This turns out to be a typical problem with replication, so there's
nothing much this feature can do to prevent WAL file accumulation
except for asking one to monitor replication lag and WAL file growth.

5. Logical subscribers replication lag will depend on all candidate
standbys replication lag. If candidate standbys are too far from
primary and logical subscribers are too close, still logical
subscribers will have replication lag. There's nothing much this
feature can do to prevent this except for calling it out in
documentation.

6. This feature might need to prevent the GUCs from deviating on
primary and the candidate standbys - there's no point in syncing a
logical slot on candidate standbys if logical walsender related to it
on primary isn't keeping itself behind all the candidate standbys. If
preventing this from happening proves to be tough, calling it out in
documentation to keep GUCs the same is a good start.

7. There are some important review comments provided upthread as far
as this design and patches are concerned -
https://www.postgresql.org/message-id/20220207204557.74mgbhowydjco4mh%40alap3.anarazel.de
and 
https://www.postgresql.org/message-id/20220207203222.22aktwxrt3fcllru%40alap3.anarazel.de.
I'm sure we can come to these once the design is clear.

Please feel free to add the list if I'm missing anything.

Thoughts?

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 666a73e79d9965779488db1cac6cd2d0a2c73ffb Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 22 Jul 2023 10:17:48 +0000
Subject: [PATCH v9] Allow logical walsenders to wait for physical standbys

---
 doc/src/sgml/config.sgml                      |  42 ++++
 .../replication/logical/reorderbuffer.c       |   9 +
 src/backend/replication/slot.c                | 216 +++++++++++++++++-
 src/backend/utils/misc/guc_tables.c           |  30 +++
 src/backend/utils/misc/postgresql.conf.sample |   4 +
 src/include/replication/slot.h                |   4 +
 src/include/utils/guc_hooks.h                 |   4 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_verify_slot_order.pl  | 146 ++++++++++++
 9 files changed, 455 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/050_verify_slot_order.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 11251fa05e..83a7d2e87e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4397,6 +4397,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical replication slots that logical replication waits for.
+        Specify <literal>*</literal> to wait for all physical replication
+        slots. If a logical replication connection is meant to switch to a
+        physical standby after the standby is promoted, the physical
+        replication slot for the standby should be listed here. This ensures
+        that logical replication is not ahead of the physical standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -4545,6 +4563,30 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronize_slot_names" xreflabel="synchronize_slot_names">
+      <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>synchronize_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a list of logical replication slots that a streaming
+        replication standby should synchronize from the primary server. This is
+        necessary to be able to retarget those logical replication connections
+        to this standby if it gets promoted.  Specify <literal>*</literal> to
+        synchronize all logical replication slots. The default is empty. On
+        primary, the logical walsenders associated with logical replication
+        slots specified in this parameter will wait for the standby servers
+        specified in <xref linkend="guc-standby-slot-names"/> parameter. In
+        other words, primary ensures those logical replication slots will
+        never get ahead of the standby servers. On standby server, the logical
+        replication slots specified are synchronized from the primary. Set this
+        parameter to same value on both primary and standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..f7a7050d2c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -100,6 +100,7 @@
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -107,6 +108,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
+#include "utils/varlena.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -2498,6 +2500,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		}
 		else
 		{
+			/*
+			 * Before we send out the last set of changes to logical decoding
+			 * output plugin, wait for specified streaming replication standby
+			 * servers (if any) to confirm receipt of WAL upto commit_lsn.
+			 */
+			WaitForStandbyLSN(commit_lsn);
+
 			/*
 			 * Call either PREPARE (for two-phase transactions) or COMMIT (for
 			 * regular ones).
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1dc27264f6..dc1d11a564 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -52,6 +52,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/varlena.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
+/* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+char	*synchronize_slot_names;
+char	*standby_slot_names;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -111,6 +115,8 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
 
+static bool validate_slot_names(char **newval);
+
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
  */
@@ -2085,3 +2091,211 @@ RestoreSlotFromDisk(const char *name)
 				(errmsg("too many replication slots active before shutdown"),
 				 errhint("Increase max_replication_slots and try again.")));
 }
+
+/*
+ * A helper function to simplify check_hook implementation for
+ * synchronize_slot_names and standby_slot_names GUCs.
+ */
+static bool
+validate_slot_names(char **newval)
+{
+	char	   *rawname;
+	List	   *elemlist;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawname, ',', &elemlist))
+	{
+		/* syntax error in name list */
+		GUC_check_errdetail("List syntax is invalid.");
+		pfree(rawname);
+		list_free(elemlist);
+		return false;
+	}
+
+	pfree(rawname);
+	list_free(elemlist);
+	return true;
+}
+
+/*
+ * GUC check_hook for synchronize_slot_names
+ */
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+	/* Special handling for "*" which means all. */
+	if (strcmp(*newval, "*") == 0)
+		return true;
+
+	if (strcmp(*newval, "") == 0)
+		return true;
+
+	return validate_slot_names(newval);
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+	/* Special handling for "*" which means all. */
+	if (strcmp(*newval, "*") == 0)
+		return true;
+
+	if (strcmp(*newval, "") == 0)
+		return true;
+
+	return validate_slot_names(newval);
+}
+
+/*
+ * Function in which logical walsender (the caller) corresponding to a logical
+ * slot specified in synchronize_slot_names GUC value waits for one or more
+ * physical standbys corresponding to specified physical slots in
+ * standby_slot_names GUC value.
+ */
+void
+WaitForStandbyLSN(XLogRecPtr wait_for_lsn)
+{
+	char	*rawname;
+	List	*elemlist;
+	ListCell	*l;
+	ReplicationSlot *slot;
+
+	Assert(MyReplicationSlot != NULL);
+	Assert(SlotIsLogical(MyReplicationSlot));
+
+	if (strcmp(standby_slot_names, "") == 0)
+		return;
+
+	/*
+	 * Check if the slot associated with this logical walsender is asked to
+	 * wait for physical standbys.
+	 */
+	if (strcmp(synchronize_slot_names, "") == 0)
+		return;
+
+	/* "*" means all logical walsenders should wait for physical standbys. */
+	if (strcmp(synchronize_slot_names, "*") != 0)
+	{
+		bool	shouldwait = false;
+
+		rawname = pstrdup(synchronize_slot_names);
+		SplitIdentifierString(rawname, ',', &elemlist);
+
+		foreach (l, elemlist)
+		{
+			char *name = lfirst(l);
+			if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0)
+			{
+				shouldwait = true;
+				break;
+			}
+		}
+
+		pfree(rawname);
+		rawname = NULL;
+		list_free(elemlist);
+		elemlist = NIL;
+
+		if (!shouldwait)
+			return;
+	}
+
+	rawname = pstrdup(standby_slot_names);
+	SplitIdentifierString(rawname, ',', &elemlist);
+
+retry:
+
+	foreach (l, elemlist)
+	{
+		char *name = lfirst(l);
+		XLogRecPtr	restart_lsn;
+		bool	invalidated;
+
+		slot = SearchNamedReplicationSlot(name, true);
+
+		/*
+		 * It may happen that the slot specified in standby_slot_names GUC
+		 * value is dropped, so let's skip over it.
+		 */
+		if (!slot)
+		{
+			ereport(WARNING,
+					errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/*
+		 * It may happen that the physical slot specified in standby_slot_names
+		 * is dropped without removing it from the GUC value, and a logical
+		 * slot has been created with the same name meanwhile. Let's skip over
+		 * it.
+		 *
+		 * NB: We might think to modify the GUC value automatically while
+		 * dropping a physical replication slot, but that won't be a nice idea
+		 * given that the slot can sometimes be dropped in process exit paths
+		 * (check ReplicationSlotCleanup call sites), so modifying GUC value
+		 * there isn't a great idea.
+		 */
+		if (SlotIsLogical(slot))
+		{
+			ereport(WARNING,
+					errmsg("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/* physical slots advance restart_lsn on remote flush */
+		SpinLockAcquire(&slot->mutex);
+		restart_lsn = slot->data.restart_lsn;
+		invalidated = slot->data.invalidated != RS_INVAL_NONE;
+		SpinLockRelease(&slot->mutex);
+
+		/*
+		 * Specified physical slot may have been invalidated, so no point in
+		 * waiting for it.
+		*/
+		if (restart_lsn == InvalidXLogRecPtr || invalidated)
+		{
+			ereport(WARNING,
+					errmsg("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/* If the slot is past the wait_for_lsn, no need to wait anymore */
+		if (restart_lsn >= wait_for_lsn)
+		{
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+	}
+
+	if (list_length(elemlist) == 0)
+	{
+		pfree(rawname);
+		return; 	/* Exit if done waiting for everyone */
+	}
+
+	/* XXX: Is waiting for 1 second before retrying enough or more or less? */
+
+	/* XXX: Need to have a new wait event type. */
+	(void) WaitLatch(MyLatch,
+					 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					 1000L,
+					 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+	ResetLatch(MyLatch);
+
+	CHECK_FOR_INTERRUPTS();
+
+	goto retry;
+}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f9dba43b8c..d72b6b95b6 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4551,6 +4551,36 @@ struct config_string ConfigureNamesString[] =
 		check_io_direct, assign_io_direct, NULL
 	},
 
+	/*
+	 * XXX: synchronize_slot_names needs to be specified on both primary and
+	 * standby, therefore, we might need a new group REPLICATION.
+	 */
+	{
+		{"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("List of replication slot names to synchronize from "
+						 "primary to streaming replication standby server."),
+			gettext_noop("Value of \"*\" means all."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&synchronize_slot_names,
+		"",
+		check_synchronize_slot_names, NULL, NULL
+	},
+
+	{
+		{"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+			gettext_noop("List of streaming replication standby server slot "
+						 "names that logical walsenders waits for."),
+			gettext_noop("Decoded changes are sent out to plugins by logical "
+						 "walsenders only after specified replication slots "
+						 "confirm receiving WAL."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&standby_slot_names,
+		"",
+		check_standby_slot_names, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c768af9a73..63daf586f3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -328,6 +328,8 @@
 				# method to choose sync standbys, number of sync standbys,
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+				# logical walsenders waits for
 
 # - Standby Servers -
 
@@ -355,6 +357,8 @@
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
+#synchronize_slot_names = ''	# replication slot names to synchronize from
+					# primary to streaming replication standby server
 
 # - Subscribers -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..2765f99ccf 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -203,6 +203,8 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT char *synchronize_slot_names;
+extern PGDLLIMPORT char *standby_slot_names;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -246,4 +248,6 @@ extern void CheckPointReplicationSlots(void);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 
+extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn);
+
 #endif							/* SLOT_H */
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 2ecb9fc086..259aefb9d7 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -159,5 +159,9 @@ extern void assign_wal_consistency_checking(const char *newval, void *extra);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 extern bool check_io_direct(char **newval, void **extra, GucSource source);
 extern void assign_io_direct(const char *newval, void *extra);
+extern bool check_synchronize_slot_names(char **newval, void **extra,
+										 GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra,
+									 GucSource source);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index e7328e4894..ee590eeac7 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -43,6 +43,7 @@ tests += {
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
+      't/050_verify_slot_order.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl
new file mode 100644
index 0000000000..402b704e3f
--- /dev/null
+++ b/src/test/recovery/t/050_verify_slot_order.pl
@@ -0,0 +1,146 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test primary disallowing specified logical replication slots getting ahead of
+# specified physical replication slots. It uses the following set up:
+#
+#           	| ----> standby1 (connected via streaming replication)
+#				| ----> standby2 (connected via streaming replication)
+# primary -----	|
+#		    	| ----> subscriber1 (connected via logical replication)
+#		    	| ----> subscriber2 (connected via logical replication)
+#
+# Set up is configured in such a way that primary never lets subscriber1 ahead
+# of standby1.
+
+# Create primary
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Configure primary to disallow specified logical replication slot (lsub1_slot)
+# getting ahead of specified physical replication slot (sb1_slot).
+$primary->append_conf(
+	'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+synchronize_slot_names = 'lsub1_slot'
+));
+$primary->start;
+
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb1_slot');});
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb2_slot');});
+
+$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+
+my $backup_name = 'backup';
+$primary->backup($backup_name);
+
+# Create a standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby1->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb1_slot'
+));
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Create another standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb2_slot'
+));
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Create publication on primary
+my $publisher = $primary;
+$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;");
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+
+# Create a subscriber node, wait for sync to complete
+my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$subscriber1->init(allows_streaming => 'logical');
+$subscriber1->start;
+$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber1->safe_psql('postgres',
+		"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' "
+	  . "PUBLICATION mypub WITH (slot_name = lsub1_slot);");
+$subscriber1->wait_for_subscription_sync;
+
+# Create another subscriber node, wait for sync to complete
+my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$subscriber2->init(allows_streaming => 'logical');
+$subscriber2->start;
+$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber2->safe_psql('postgres',
+		"CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' "
+	  . "PUBLICATION mypub WITH (slot_name = lsub2_slot);");
+$subscriber2->wait_for_subscription_sync;
+
+# Stop the standby associated with specified physical replication slot so that
+# the logical replication slot won't receive changes until the standby comes
+# up.
+$standby1->stop;
+
+# Create some data on primary
+my $primary_row_count = 10;
+my $primary_insert_time = time();
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
+
+# Wait for the standby that's up and running gets the data from primary
+$primary->wait_for_replay_catchup($standby2);
+my $result = $standby2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby2 gets data from primary");
+
+# Wait for the subscriber that's up and running and not specified in
+# synchronize_slot_names GUC on primary gets the data from primary without
+# waiting for any standbys.
+$publisher->wait_for_catchup('mysub2');
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber2 gets data from primary");
+
+# The subscriber that's up and running and specified in synchronize_slot_names
+# GUC on primary doesn't get the data from primary and keeps waiting for the
+# standby specified in standby_slot_names.
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = 0 FROM tab_int;");
+is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes");
+
+# Start the standby specified in standby_slot_names and wait for it to catch
+# up with the primary.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Now that the standby specified in standby_slot_names is up and running,
+# primary must send the decoded changes to subscriber specified in
+# synchronize_slot_names. While the standby was down, this subscriber didn't
+# receive any data from primary i.e. the primary didn't allow it to go ahead
+# of standby.
+$publisher->wait_for_catchup('mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+done_testing();
-- 
2.34.1

From 30d4af524c1b2128519d3b0af761874682961be5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 22 Jul 2023 10:28:21 +0000
Subject: [PATCH v9] Add logical slot sync capability to physical standby

---
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/bgworker.c             |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  95 +++++
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    | 263 +++++++++-----
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/slotsync.c    | 332 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  13 +-
 src/backend/replication/logical/worker.c      |   3 +-
 src/backend/replication/repl_gram.y           |  32 +-
 src/backend/replication/repl_scanner.l        |   2 +
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           | 195 ++++++++++
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |   3 +
 src/include/commands/subscriptioncmds.h       |   3 +
 src/include/nodes/replnodes.h                 |   9 +
 src/include/replication/logicallauncher.h     |   2 +
 src/include/replication/logicalworker.h       |   1 +
 src/include/replication/slot.h                |   5 +-
 src/include/replication/walreceiver.h         |  20 ++
 src/include/replication/worker_internal.h     |   8 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/051_slot_sync.pl          | 132 +++++++
 24 files changed, 1037 insertions(+), 94 deletions(-)
 create mode 100644 src/backend/replication/logical/slotsync.c
 create mode 100644 src/test/recovery/t/051_slot_sync.pl

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d4e798baeb..42e9b1056c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -993,7 +993,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				logicalrep_worker_stop(MyDatabaseId, sub->oid, relid);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -1591,7 +1591,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
+		logicalrep_worker_stop(w->dbid, w->subid, w->relid);
 	}
 	list_free(subworkers);
 
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5b4bd71694..f2f4475c3b 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
 	},
+	{
+		"ReplSlotSyncMain", ReplSlotSyncMain
+	},
 	{
 		"ParallelApplyWorkerMain", ParallelApplyWorkerMain
 	}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 60d5c1fc40..0e13cc2417 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -34,6 +34,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
+#include "utils/varlena.h"
 
 PG_MODULE_MAGIC;
 
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 									  TimeLineID *primary_tli);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names);
 static int	libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 											 TimeLineID tli, char **filename,
@@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_receive = libpqrcv_receive,
 	.walrcv_send = libpqrcv_send,
 	.walrcv_create_slot = libpqrcv_create_slot,
+	.walrcv_list_slots = libpqrcv_list_slots,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
 	.walrcv_disconnect = libpqrcv_disconnect
@@ -409,6 +412,98 @@ libpqrcv_server_version(WalReceiverConn *conn)
 	return PQserverVersion(conn->streamConn);
 }
 
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names)
+{
+	PGresult   *res;
+	List	   *slotlist = NIL;
+	int			ntuples;
+	StringInfoData s;
+	WalRecvReplicationSlotData *slot_data;
+
+	initStringInfo(&s);
+	appendStringInfoString(&s, "LIST_SLOTS");
+
+	if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		appendStringInfoChar(&s, ' ');
+		rawname = pstrdup(slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_identifier(lfirst(lc)));
+		}
+	}
+
+	res = libpqrcv_PQexec(conn->streamConn, s.data);
+	pfree(s.data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not receive list of slots the primary server: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	}
+	if (PQnfields(res) < 10)
+	{
+		int			nfields = PQnfields(res);
+
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("invalid response from primary server"),
+				 errdetail("Could not get list of slots: got %d fields, expected %d or more fields.",
+						   nfields, 10)));
+	}
+
+	ntuples = PQntuples(res);
+	for (int i = 0; i < ntuples; i++)
+	{
+		char	   *slot_type;
+
+		slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+		namestrcpy(&slot_data->persistent_data.name, PQgetvalue(res, i, 0));
+		if (!PQgetisnull(res, i, 1))
+			namestrcpy(&slot_data->persistent_data.plugin, PQgetvalue(res, i, 1));
+		slot_type = PQgetvalue(res, i, 2);
+		if (!PQgetisnull(res, i, 3))
+			slot_data->persistent_data.database = atooid(PQgetvalue(res, i, 3));
+		if (strcmp(slot_type, "physical") == 0)
+		{
+			if (OidIsValid(slot_data->persistent_data.database))
+				elog(ERROR, "unexpected physical replication slot with database set");
+		}
+		if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+			slot_data->persistent_data.persistency = RS_TEMPORARY;
+		else
+			slot_data->persistent_data.persistency = RS_PERSISTENT;
+		if (!PQgetisnull(res, i, 6))
+			slot_data->persistent_data.xmin = atooid(PQgetvalue(res, i, 6));
+		if (!PQgetisnull(res, i, 7))
+			slot_data->persistent_data.catalog_xmin = atooid(PQgetvalue(res, i, 7));
+		if (!PQgetisnull(res, i, 8))
+			slot_data->persistent_data.restart_lsn = strtou64(PQgetvalue(res, i, 8), NULL, 10);
+		if (!PQgetisnull(res, i, 9))
+			slot_data->persistent_data.confirmed_flush = strtou64(PQgetvalue(res, i, 9), NULL, 10);
+
+		slot_data->last_sync_time = 0;
+		slotlist = lappend(slotlist, slot_data);
+	}
+
+	PQclear(res);
+
+	return slotlist;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2dc25e37bb..ba03eeff1c 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -25,6 +25,7 @@ OBJS = \
 	proto.o \
 	relation.o \
 	reorderbuffer.o \
+	slotsync.o \
 	snapbuild.o \
 	tablesync.o \
 	worker.o
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d863..640f7647cc 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
@@ -246,7 +247,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * We are only interested in the leader apply worker or table sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
 {
 	int			i;
 	LogicalRepWorker *res = NULL;
@@ -262,8 +263,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		if (isParallelApplyWorker(w))
 			continue;
 
-		if (w->in_use && w->subid == subid && w->relid == relid &&
-			(!only_running || w->proc))
+		if (w->in_use && w->dbid == dbid && w->subid == subid &&
+			w->relid == relid && (!only_running || w->proc))
 		{
 			res = w;
 			break;
@@ -320,9 +321,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	/* Sanity check - tablesync worker cannot be a subworker */
 	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
 
-	ereport(DEBUG1,
-			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
-							 subname)));
+	if (OidIsValid(subid))
+		ereport(DEBUG1,
+				(errmsg_internal("starting logical replication worker for subscription \"%s\"",
+								 subname)));
+	else
+		ereport(DEBUG1,
+				(errmsg_internal("starting replication slot synchronization worker")));
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -359,7 +364,9 @@ retry:
 	 * reason we do this is because if some worker failed to start up and its
 	 * parent has crashed while waiting, the in_use state was never cleared.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL ||
+		(OidIsValid(relid) &&
+		 nsyncworkers >= max_sync_workers_per_subscription))
 	{
 		bool		did_cleanup = false;
 
@@ -455,15 +462,20 @@ retry:
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	if (!OidIsValid(subid))
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
+	else if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
 	else
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 
-	if (OidIsValid(relid))
+	if (!OidIsValid(subid))
+		snprintf(bgw.bgw_name, BGW_MAXLEN,
+				 "replication slot synchronization worker");
+	else if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication worker for subscription %u sync %u", subid, relid);
 	else if (is_parallel_apply_worker)
@@ -591,13 +603,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, false);
+	worker = logicalrep_worker_find(dbid, subid, relid, false);
 
 	if (worker)
 	{
@@ -658,13 +670,13 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, true);
+	worker = logicalrep_worker_find(dbid, subid, relid, true);
 
 	if (worker)
 		logicalrep_worker_wakeup_ptr(worker);
@@ -909,7 +921,7 @@ ApplyLauncherRegister(void)
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 	snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1092,6 +1104,157 @@ ApplyLauncherWakeup(void)
 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+static void
+ApplyLauncherStartSlotSync(long *wait_time)
+{
+	WalReceiverConn *wrconn;
+	char	   *err;
+	List	   *slots;
+	ListCell   *lc;
+	MemoryContext tmpctx;
+	MemoryContext oldctx;
+
+	if (strcmp(synchronize_slot_names, "") == 0)
+		return;
+
+	wrconn = walrcv_connect(PrimaryConnInfo, false, false,
+							"Logical Replication Launcher", &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	/* Use temporary context for the slot list and worker info. */
+	tmpctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher slot sync ctx",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(tmpctx);
+
+	slots = walrcv_list_slots(wrconn, synchronize_slot_names);
+
+	foreach(lc, slots)
+	{
+		WalRecvReplicationSlotData *slot_data = lfirst(lc);
+		LogicalRepWorker *w;
+		TimestampTz last_sync;
+		TimestampTz	now;
+		long		elapsed;
+
+		if (!OidIsValid(slot_data->persistent_data.database))
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(slot_data->persistent_data.database, InvalidOid,
+								   InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.
+		 */
+		last_sync = slot_data->last_sync_time;
+		now = GetCurrentTimestamp();
+		if (last_sync == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_sync, now)) >= wal_retrieve_retry_interval)
+		{
+			slot_data->last_sync_time = now;
+			logicalrep_worker_launch(slot_data->persistent_data.database,
+									 InvalidOid, NULL,
+									 BOOTSTRAP_SUPERUSERID, InvalidOid,
+									 DSM_HANDLE_INVALID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(tmpctx);
+
+	walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(long *wait_time)
+{
+	List	   *sublist;
+	ListCell   *lc;
+	MemoryContext subctx;
+	MemoryContext oldctx;
+
+	/* Use temporary context to avoid leaking memory across cycles. */
+	subctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher sublist",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(subctx);
+
+	/* Start the missing workers for enabled subscriptions. */
+	sublist = get_subscription_list();
+	foreach(lc, sublist)
+	{
+		Subscription *sub = (Subscription *) lfirst(lc);
+		LogicalRepWorker *w;
+		TimestampTz last_start;
+		TimestampTz now;
+		long		elapsed;
+
+		if (!sub->enabled)
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each subscription's apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.  In cases
+		 * where a restart is expected (e.g., subscription parameter
+		 * changes), another process should remove the last-start entry
+		 * for the subscription so that the worker can be restarted
+		 * without waiting for wal_retrieve_retry_interval to elapse.
+		 */
+		last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+		now = GetCurrentTimestamp();
+		if (last_start == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
+		{
+			ApplyLauncherSetWorkerStartTime(sub->oid, now);
+			logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+									 sub->owner, InvalidOid,
+									 DSM_HANDLE_INVALID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -1117,78 +1280,20 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	load_file("libpqwalreceiver", false);
+
 	/* Enter main loop */
 	for (;;)
 	{
 		int			rc;
-		List	   *sublist;
-		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Use temporary context to avoid leaking memory across cycles. */
-		subctx = AllocSetContextCreate(TopMemoryContext,
-									   "Logical Replication Launcher sublist",
-									   ALLOCSET_DEFAULT_SIZES);
-		oldctx = MemoryContextSwitchTo(subctx);
-
-		/* Start any missing workers for enabled subscriptions. */
-		sublist = get_subscription_list();
-		foreach(lc, sublist)
-		{
-			Subscription *sub = (Subscription *) lfirst(lc);
-			LogicalRepWorker *w;
-			TimestampTz last_start;
-			TimestampTz now;
-			long		elapsed;
-
-			if (!sub->enabled)
-				continue;
-
-			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-			LWLockRelease(LogicalRepWorkerLock);
-
-			if (w != NULL)
-				continue;		/* worker is running already */
-
-			/*
-			 * If the worker is eligible to start now, launch it.  Otherwise,
-			 * adjust wait_time so that we'll wake up as soon as it can be
-			 * started.
-			 *
-			 * Each subscription's apply worker can only be restarted once per
-			 * wal_retrieve_retry_interval, so that errors do not cause us to
-			 * repeatedly restart the worker as fast as possible.  In cases
-			 * where a restart is expected (e.g., subscription parameter
-			 * changes), another process should remove the last-start entry
-			 * for the subscription so that the worker can be restarted
-			 * without waiting for wal_retrieve_retry_interval to elapse.
-			 */
-			last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
-			now = GetCurrentTimestamp();
-			if (last_start == 0 ||
-				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
-			{
-				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-										 sub->owner, InvalidOid,
-										 DSM_HANDLE_INVALID);
-			}
-			else
-			{
-				wait_time = Min(wait_time,
-								wal_retrieve_retry_interval - elapsed);
-			}
-		}
-
-		/* Switch back to original memory context. */
-		MemoryContextSwitchTo(oldctx);
-		/* Clean the temporary memory. */
-		MemoryContextDelete(subctx);
+		if (!RecoveryInProgress())
+			ApplyLauncherStartSubs(&wait_time);
+		else
+			ApplyLauncherStartSlotSync(&wait_time);
 
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index d48cd4c590..9e52ec421f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -11,6 +11,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'slotsync.c',
   'snapbuild.c',
   'tablesync.c',
   'worker.c',
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..77457001e7
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,332 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *	   PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+							  XLogRecPtr min_lsn)
+{
+	WalRcvExecResult *res;
+	TupleTableSlot *slot;
+	Oid			slotRow[1] = {LSNOID};
+	StringInfoData cmd;
+	bool		isnull;
+	XLogRecPtr	restart_lsn;
+
+	for (;;)
+	{
+		int			rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT restart_lsn"
+						 "  FROM pg_catalog.pg_replication_slots"
+						 " WHERE slot_name = %s",
+						 quote_literal_cstr(slot_name));
+		res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
+							slot_name, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+			ereport(ERROR,
+					(errmsg("slot \"%s\" disapeared from provider",
+							slot_name)));
+
+		restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+		walrcv_clear_result(res);
+
+		if (restart_lsn >= min_lsn)
+			break;
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+					 char *plugin_name, XLogRecPtr target_lsn)
+{
+	bool		found = false;
+	XLogRecPtr	endlsn;
+
+	/* Search for the named slot and mark it active if we find it. */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (strcmp(NameStr(s->data.name), slot_name) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	StartTransactionCommand();
+
+	/* Already existing slot, acquire */
+	if (found)
+	{
+		ReplicationSlotAcquire(slot_name, true);
+
+		if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+		{
+			elog(DEBUG1,
+				 "not synchronizing slot %s; synchronization would move it backward",
+				 slot_name);
+
+			ReplicationSlotRelease();
+			CommitTransactionCommand();
+			return;
+		}
+	}
+	/* Otherwise create the slot first. */
+	else
+	{
+		TransactionId xmin_horizon = InvalidTransactionId;
+		ReplicationSlot *slot;
+
+		ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false);
+		slot = MyReplicationSlot;
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.database = get_database_oid(database, false);
+		namestrcpy(&slot->data.plugin, plugin_name);
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotReserveWal();
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+		slot->effective_catalog_xmin = xmin_horizon;
+		slot->data.catalog_xmin = xmin_horizon;
+		ReplicationSlotsComputeRequiredXmin(true);
+		LWLockRelease(ProcArrayLock);
+
+		if (target_lsn < MyReplicationSlot->data.restart_lsn)
+		{
+			ereport(LOG,
+					errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass local slot LSN (%X/%X)",
+						   slot_name,
+						   LSN_FORMAT_ARGS(target_lsn), LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+			wait_for_primary_slot_catchup(wrconn, slot_name,
+										  MyReplicationSlot->data.restart_lsn);
+		}
+
+		ReplicationSlotPersist();
+	}
+
+	endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+	elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+		 slot_name, LSN_FORMAT_ARGS(endlsn));
+
+	ReplicationSlotRelease();
+	CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+	WalRcvExecResult *res;
+	WalReceiverConn *wrconn = NULL;
+	TupleTableSlot *slot;
+	Oid			slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+	StringInfoData s;
+	char	   *database;
+	char	   *err;
+	MemoryContext oldctx = CurrentMemoryContext;
+
+	if (!WalRcv)
+		return;
+
+	/* syscache access needs a transaction env. */
+	StartTransactionCommand();
+	/* make dbname live outside TX context */
+	MemoryContextSwitchTo(oldctx);
+
+	database = get_database_name(MyDatabaseId);
+	initStringInfo(&s);
+	appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+	wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err);
+
+	if (wrconn == NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	resetStringInfo(&s);
+	appendStringInfo(&s,
+					 "SELECT slot_name, plugin, confirmed_flush_lsn"
+					 "  FROM pg_catalog.pg_replication_slots"
+					 " WHERE database = %s",
+					 quote_literal_cstr(database));
+	if (strcmp(synchronize_slot_names, "") != 0 && strcmp(synchronize_slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		rawname = pstrdup(synchronize_slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+
+		appendStringInfoString(&s, " AND slot_name IN (");
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_literal_cstr(lfirst(lc)));
+		}
+		appendStringInfoChar(&s, ')');
+	}
+
+	res = walrcv_exec(wrconn, s.data, 3, slotRow);
+	pfree(s.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch slot info from primary: %s",
+						res->err)));
+
+	CommitTransactionCommand();
+	/* CommitTransactionCommand switches to TopMemoryContext */
+	MemoryContextSwitchTo(oldctx);
+
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *slot_name;
+		char	   *plugin_name;
+		XLogRecPtr	confirmed_flush_lsn;
+		bool		isnull;
+
+		slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		plugin_name = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+							 confirmed_flush_lsn);
+
+		ExecClearTuple(slot);
+	}
+
+	walrcv_clear_result(res);
+	pfree(database);
+
+	walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+
+	/* Attach to slot */
+	logicalrep_worker_attach(worker_slot);
+
+	/* Establish signal handlers. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
+	/* Connect to our database. */
+	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+											  MyLogicalRepWorker->userid,
+											  0);
+
+	StartTransactionCommand();
+	ereport(LOG,
+			(errmsg("replication slot synchronization worker for database \"%s\" has started",
+					get_database_name(MyLogicalRepWorker->dbid))));
+	CommitTransactionCommand();
+
+	/* Main wait loop. */
+	for (;;)
+	{
+		int			rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (!RecoveryInProgress())
+			return;
+
+		if (strcmp(synchronize_slot_names, "") == 0)
+			return;
+
+		synchronize_slots();
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d461654ab..5a98d2b699 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -156,7 +157,8 @@ finish_sync_worker(void)
 	CommitTransactionCommand();
 
 	/* Find the leader apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+	logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+							 MyLogicalRepWorker->subid, InvalidOid);
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -196,7 +198,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 
 		/* Check if the sync worker is still running and bail if not. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid, relid,
 										false);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
@@ -243,7 +246,8 @@ wait_for_worker_state_change(char expected_state)
 		 * waiting.
 		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid,
 										InvalidOid, false);
 		if (worker && worker->proc)
 			logicalrep_worker_wakeup_ptr(worker);
@@ -509,7 +513,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 */
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+			syncworker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+												MyLogicalRepWorker->subid,
 												rstate->relid, false);
 
 			if (syncworker)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd1cb..72f39ace7b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1589,7 +1589,8 @@ apply_handle_stream_start(StringInfo s)
 				 * Signal the leader apply worker, as it may be waiting for
 				 * us.
 				 */
-				logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+				logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+										 MyLogicalRepWorker->subid, InvalidOid);
 			}
 
 			parallel_stream_nchanges = 0;
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..12a4b74368 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,11 +76,12 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_LIST_SLOTS
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				read_replication_slot timeline_history show
+				read_replication_slot timeline_history show list_slots
 %type <list>	generic_option_list
 %type <defelt>	generic_option
 %type <uintval>	opt_timeline
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <list>	slot_name_list slot_name_list_opt
 
 %%
 
@@ -114,6 +116,7 @@ command:
 			| read_replication_slot
 			| timeline_history
 			| show
+			| list_slots
 			;
 
 /*
@@ -126,6 +129,33 @@ identify_system:
 				}
 			;
 
+slot_name_list:
+			IDENT
+				{
+					$$ = list_make1($1);
+				}
+			| slot_name_list ',' IDENT
+				{
+					$$ = lappend($1, $3);
+				}
+
+slot_name_list_opt:
+			slot_name_list			{ $$ = $1; }
+			| /* EMPTY */			{ $$ = NIL; }
+		;
+
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+			K_LIST_SLOTS slot_name_list_opt
+				{
+					ListSlotsCmd *cmd = makeNode(ListSlotsCmd);
+					cmd->slot_names = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
 /*
  * READ_REPLICATION_SLOT %s
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 1cc7fb858c..11064feb86 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
+LIST_SLOTS			{ return K_LIST_SLOTS; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
@@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void)
 		case K_READ_REPLICATION_SLOT:
 		case K_TIMELINE_HISTORY:
 		case K_SHOW:
+		case K_LIST_SLOTS:
 			/* Yes; push back the first token so we can parse later. */
 			repl_pushed_back_token = first_token;
 			return true;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..83ada6db6a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  */
-static XLogRecPtr
+XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..26d07ae549 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -473,6 +473,194 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+	return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(ListSlotsCmd *cmd)
+{
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	NameData   *slot_names;
+	int			numslot_names;
+
+	numslot_names = list_length(cmd->slot_names);
+	if (numslot_names)
+	{
+		ListCell   *lc;
+		int			i = 0;
+
+		slot_names = palloc(numslot_names * sizeof(NameData));
+		foreach(lc, cmd->slot_names)
+		{
+			char	   *slot_name = lfirst(lc);
+
+			ReplicationSlotValidateName(slot_name, ERROR);
+			namestrcpy(&slot_names[i++], slot_name);
+		}
+
+		qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+
+	/* need a tuple descriptor representing four columns */
+	tupdesc = CreateTemplateTupleDesc(10);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+							  TEXTOID, -1, 0);
+
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int slotno = 0; slotno < max_replication_slots; slotno++)
+	{
+		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+		char		restart_lsn_str[MAXFNAMELEN];
+		char		confirmed_flush_lsn_str[MAXFNAMELEN];
+		Datum		values[10];
+		bool		nulls[10];
+
+		ReplicationSlotPersistency persistency;
+		TransactionId xmin;
+		TransactionId catalog_xmin;
+		XLogRecPtr	restart_lsn;
+		XLogRecPtr	confirmed_flush_lsn;
+		Oid			datoid;
+		NameData	slot_name;
+		NameData	plugin;
+		int			i;
+		int64		tmpbigint;
+
+		if (!slot->in_use)
+			continue;
+
+		SpinLockAcquire(&slot->mutex);
+
+		xmin = slot->data.xmin;
+		catalog_xmin = slot->data.catalog_xmin;
+		datoid = slot->data.database;
+		restart_lsn = slot->data.restart_lsn;
+		confirmed_flush_lsn = slot->data.confirmed_flush;
+		namestrcpy(&slot_name, NameStr(slot->data.name));
+		namestrcpy(&plugin, NameStr(slot->data.plugin));
+		persistency = slot->data.persistency;
+
+		SpinLockRelease(&slot->mutex);
+
+		if (numslot_names &&
+			!bsearch((void *) &slot_name, (void *) slot_names,
+					 numslot_names, sizeof(NameData), pg_qsort_namecmp))
+			continue;
+
+		memset(nulls, 0, sizeof(nulls));
+
+		i = 0;
+		values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+			values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+		if (datoid == InvalidOid)
+			values[i++] = CStringGetTextDatum("physical");
+		else
+			values[i++] = CStringGetTextDatum("logical");
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			tmpbigint = datoid;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			MemoryContext cur = CurrentMemoryContext;
+
+			/* syscache access needs a transaction env. */
+			StartTransactionCommand();
+			/* make dbname live outside TX context */
+			MemoryContextSwitchTo(cur);
+			values[i++] = CStringGetTextDatum(get_database_name(datoid));
+			CommitTransactionCommand();
+			/* CommitTransactionCommand switches to TopMemoryContext */
+			MemoryContextSwitchTo(cur);
+		}
+
+		values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0);
+
+		if (xmin != InvalidTransactionId)
+		{
+			tmpbigint = xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (catalog_xmin != InvalidTransactionId)
+		{
+			tmpbigint = catalog_xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (restart_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X",
+					 LSN_FORMAT_ARGS(restart_lsn));
+			values[i++] = CStringGetTextDatum(restart_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		if (confirmed_flush_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str),
+					 "%X/%X", LSN_FORMAT_ARGS(confirmed_flush_lsn));
+			values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		/* send it to dest */
+		do_tup_output(tstate, values, nulls);
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	end_tup_output(tstate);
+}
+
 /* Handle READ_REPLICATION_SLOT command */
 static void
 ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -1819,6 +2007,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ListSlotsCmd:
+			cmdtag = "LIST_SLOTS";
+			set_ps_display(cmdtag);
+			ListSlots((ListSlotsCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_StartReplicationCmd:
 			{
 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 3fabad96d9..7bfef97df1 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -53,6 +53,7 @@ WAIT_EVENT_LOGICAL_APPLY_MAIN	LogicalApplyMain	"Waiting in main loop of logical
 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN	LogicalLauncherMain	"Waiting in main loop of logical replication launcher process."
 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN	LogicalParallelApplyMain	"Waiting in main loop of logical replication parallel apply process."
 WAIT_EVENT_RECOVERY_WAL_STREAM	RecoveryWalStream	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
+WAIT_EVENT_REPL_SLOT_SYNC_MAIN	ReplSlotSyncMain	"Waiting in main loop of worker for synchronizing slots to a standby from primary."
 WAIT_EVENT_SYSLOGGER_MAIN	SysLoggerMain	"Waiting in main loop of syslogger process."
 WAIT_EVENT_WAL_RECEIVER_MAIN	WalReceiverMain	"Waiting in main loop of WAL receiver process."
 WAIT_EVENT_WAL_SENDER_MAIN	WalSenderMain	"Waiting in main loop of WAL sender process."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d72b6b95b6..131e32273c 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -63,8 +63,11 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..0e77f9ee5c 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
 
 #include "catalog/objectaddress.h"
 #include "parser/parse_node.h"
+#include "replication/walreceiver.h"
 
 extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										bool isTopLevel);
@@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
 
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..980e0b2ee2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+	NodeTag		type;
+	List	   *slot_names;
+} ListSlotsCmd;
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index a07c9cb311..80fdbf9657 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -31,4 +31,6 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern PGDLLIMPORT char *PrimaryConnInfo;
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 39588da79f..4bb190ab81 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 2765f99ccf..1d44a64736 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
-#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -240,7 +239,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
@@ -250,4 +248,7 @@ extern void CheckSlotPermissions(void);
 
 extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
 #endif							/* SLOT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..9e9d64faf2 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -20,6 +20,7 @@
 #include "pgtime.h"
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/latch.h"
@@ -191,6 +192,17 @@ typedef struct
 	}			proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently same as ReplicationSlotPersistentData except last_sync_time
+ */
+typedef struct WalRecvReplicationSlotData
+{
+	ReplicationSlotPersistentData persistent_data;
+	TimestampTz last_sync_time;
+} WalRecvReplicationSlotData;
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli);
 
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char *slots);
+
 /*
  * walrcv_server_version_fn
  *
@@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
+	walrcv_list_slots_fn walrcv_list_slots;
 	walrcv_server_version_fn walrcv_server_version;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
@@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_slots(conn, slots) \
+	WalReceiverFunctions->walrcv_list_slots(conn, slots)
 #define walrcv_server_version(conn) \
 	WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781896..d42cff3f2c 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -65,7 +65,7 @@ typedef struct LogicalRepWorker
 	 * would be created for each transaction which will be deleted after the
 	 * transaction is finished.
 	 */
-	FileSet    *stream_fileset;
+	struct FileSet    *stream_fileset;
 
 	/*
 	 * PID of leader apply worker if this slot is used for a parallel apply
@@ -228,15 +228,15 @@ extern PGDLLIMPORT bool in_remote_transaction;
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index ee590eeac7..ca043d2009 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -44,6 +44,7 @@ tests += {
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
       't/050_verify_slot_order.pl',
+      't/051_slot_sync.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/051_slot_sync.pl b/src/test/recovery/t/051_slot_sync.pl
new file mode 100644
index 0000000000..febe4e3db8
--- /dev/null
+++ b/src/test/recovery/t/051_slot_sync.pl
@@ -0,0 +1,132 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Check invalidation in the logfile
+sub check_for_invalidation
+{
+	my ($log_start, $test_name) = @_;
+
+	# message should be issued
+	ok( find_in_log(
+		$node_phys_standby,
+        "invalidating obsolete replication slot \"sub1\"", $log_start),
+        "sub1 slot invalidation is logged $test_name");
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my $res = $node_phys_standby->safe_psql(
+				'postgres', qq(
+				select bool_and(conflicting) from pg_replication_slots;));
+
+	is($res, 't',
+		"Logical slot is reported as conflicting");
+}
+
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+standby_slot_names = 'pslot1'
+});
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]);
+
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
+
+$node_primary->wait_for_catchup($node_phys_standby->name);
+
+# Logical subscriber and physical replica are caught up at this point.
+
+# Drop the subscription so that catalog_xmin is unknown on the primary
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+
+# This should trigger a conflict as hot_standby_feedback is off on the standby
+$node_primary->safe_psql('postgres', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+# Ensure physical replay catches up
+$node_primary->wait_for_catchup($node_phys_standby);
+
+# Check invalidation in the logfile
+check_for_invalidation(1, 'with vacuum FULL on pg_class');
+
+# Check conflicting status in pg_replication_slots.
+check_slots_conflicting_status();
+
+done_testing();
-- 
2.34.1

Reply via email to