Hi,

As Andres has mentioned over at minimal decoding on standby thread [1],
that functionality can be used to add simple worker which periodically
synchronizes the slot state from the primary to a standby.

Attached patch is rough implementation of such worker. It's nowhere near
committable in the current state, it servers primarily two purposes - to
have something over what we can agree on the approach (and if we do,
serve as base for that) and to demonstrate that the patch in [1] can
indeed be used for this functionality. All this means that this patch
depends on the [1] to work.

The approach chosen by me is to change the logical replication launcher
to run also on a standby and to support new type of worker which is
started on a standby for the slot synchronization. The new worker
(slotsync) is responsible for periodically fetching information from the
primary server and moving slots on the standby forward (using the fast
forwarding functionality added in PG11) based on that. There is one
worker per database (logical slots are per database, walrcv_exec needs
db connection, etc). I had to add new replication command for listing
slots so that the launcher can check which databases on the upstream
actually have slots and start the slotsync only for those. The second
patch in the series just adds ability to filter which slots are actually
synchronized.

This approach should be eventually portable to logical replication as
well. The only difference there is that we need to be able to map lsns
of the publisher to the lsns of the subscriber. We already do that in
apply so that should be doable, I don't have that as goal for first
version of the feature though.

The basic functionality seems to be working pretty well, however there
are several discussion points and unfinished parts:

a) Do we want to automatically create and drop slots when they get
created on the primary? Currently the patch does auto-create but does
not auto-drop yet. There is no way to signal that slot was dropped so I
don't see straightforward way to differentiate between slots that have
been dropped on master and those that only exist on standby. I guess if
we added the second feature with slot list as well we could drop
anything on that list that's not on primary...
b) The slot creation is somewhat interesting. The slot might be created
while standby does not have wal for existing slots on primary because
they are behind of standby. We solve it by creating ephemeral slot and
wait for the primary slot to pass it's lsn before persisting it
(similarly to when we are trying to build initial snapshot). This seems
reasonable to me but the coding could use another pair of eyes there.
c) With the periodical start/stop (for the move) of the decoding on the
slot, the logging of every start of decoding context is pretty
annoying/spammy, we should probably tune that down.
d) The launcher integration needs improvement - add worker kind rather
than guessing from values of dbid, subid and relid and do decisions
based on that. Also the interfaces for manipulating the workers should
probably use LogicalRepWorkerId rather than above mentioned parameters
and guessing everywhere.
e) We probably should support synchronizing physical slots as well
(currently we only sync logical slots). But that should be easy provided
we don't mind that logical replication launcher is somewhat misnomer then...
f) Maybe walreceiver or startup should signal these new workers if
enough data is processed, so it's not purely time based. But I think
that kind of optimization can be left for later.

Also (these are pretty pointless until we agree that this is the right
approach):
- there is no documentation update yet
- there are no TAP tests yet
- the recheck timer might need GUC

[1]
https://www.postgresql.org/message-id/20181212204154.nsxf3gzqv3ges...@alap3.anarazel.de

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 719ac03dd963502381fbc210947faae472b4a95a Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sun, 30 Dec 2018 02:07:26 +0100
Subject: [PATCH 2/2] Add option for filtering which slots get synchronized

---
 .../libpqwalreceiver/libpqwalreceiver.c       |  32 +++++-
 src/backend/replication/logical/launcher.c    |   3 +-
 src/backend/replication/logical/slotsync.c    | 104 +++++++++++++++++-
 src/backend/replication/repl_gram.y           |  23 +++-
 src/backend/replication/walsender.c           |  37 ++++++-
 src/backend/utils/misc/guc.c                  |  12 ++
 src/include/nodes/replnodes.h                 |   1 +
 src/include/replication/logicalworker.h       |   9 ++
 src/include/replication/walreceiver.h         |   6 +-
 src/include/replication/worker_internal.h     |   4 +
 10 files changed, 216 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 3c77a76ea6..f564f288d0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -59,7 +59,8 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 						 TimeLineID *primary_tli,
 						 int *server_version);
-static List *libpqrcv_list_slots(WalReceiverConn *conn);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, int nslot_names,
+					NameData *slot_names);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 								 TimeLineID tli, char **filename,
 								 char **content, int *len);
@@ -355,15 +356,34 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli,
  * Get list of slots from primary.
  */
 static List *
-libpqrcv_list_slots(WalReceiverConn *conn)
+libpqrcv_list_slots(WalReceiverConn *conn, int nslot_names,
+					NameData *slot_names)
 {
 	PGresult   *res;
 	int			i;
-	List	   *slots = NIL;
+	List	   *slotlist = NIL;
 	int			ntuples;
+	StringInfoData	s;
 	WalRecvReplicationSlotData *slot_data;
 
-	res = libpqrcv_PQexec(conn->streamConn, "LIST_SLOTS");
+	initStringInfo(&s);
+	appendStringInfoString(&s, "LIST_SLOTS");
+	if (nslot_names > 0)
+	{
+		int				i;
+
+		appendStringInfoChar(&s, ' ');
+		for (i = 0; i < nslot_names; i++)
+		{
+			if (i > 0)
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_identifier(NameStr(slot_names[i])));
+		}
+	}
+
+	res = libpqrcv_PQexec(conn->streamConn, s.data);
+	pfree(s.data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -414,12 +434,12 @@ libpqrcv_list_slots(WalReceiverConn *conn)
 			slot_data->confirmed_flush = pg_strtouint64(PQgetvalue(res, i, 9),
 														NULL, 10);
 
-		slots = lappend(slots, slot_data);
+		slotlist = lappend(slotlist, slot_data);
 	}
 
 	PQclear(res);
 
-	return slots;
+	return slotlist;
 }
 
 /*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index c47d4ee403..161205d9e4 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1005,7 +1005,8 @@ ApplyLauncherStartSlotSync(TimestampTz *last_start_time, long *wait_time)
 									ALLOCSET_DEFAULT_SIZES);
 	oldctx = MemoryContextSwitchTo(tmpctx);
 
-	slots = walrcv_list_slots(wrconn);
+	slots = walrcv_list_slots(wrconn, numsynchronize_slot_names,
+							  synchronize_slot_names);
 
 	now = GetCurrentTimestamp();
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index eda9ad0add..07f06a8fe9 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -26,7 +26,11 @@
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/pg_lsn.h"
+#include "utils/varlena.h"
 
+char	*synchronize_slot_names_string;
+NameData *synchronize_slot_names;
+int numsynchronize_slot_names;
 
 /*
  * Wait for remote slot to pass localy reserved position.
@@ -215,12 +219,26 @@ synchronize_slots(void)
 				(errmsg("could not connect to the primary server: %s", err)));
 
 	resetStringInfo(&s);
-	/* TODO filter slot names? */
 	appendStringInfo(&s,
 					 "SELECT slot_name, plugin, confirmed_flush_lsn"
 					 "  FROM pg_catalog.pg_replication_slots"
 					 " WHERE database = %s",
 					 quote_literal_cstr(database));
+	if (numsynchronize_slot_names > 0)
+	{
+		int				i;
+
+		appendStringInfoString(&s, " AND slot_name IN (");
+		for (i = 0; i < numsynchronize_slot_names; i++)
+		{
+			if (i > 0)
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+					quote_literal_cstr(NameStr(synchronize_slot_names[i])));
+		}
+		appendStringInfoChar(&s, ')');
+	}
+
 	res = walrcv_exec(wrconn, s.data, 3, slotRow);
 	pfree(s.data);
 
@@ -300,6 +318,9 @@ ReplSlotSyncMain(Datum main_arg)
 		if (!RecoveryInProgress())
 			return;
 
+		if (numsynchronize_slot_names == 0)
+			return;
+
 		synchronize_slots();
 
 		rc = WaitLatch(MyLatch,
@@ -314,3 +335,84 @@ ReplSlotSyncMain(Datum main_arg)
 			proc_exit(1);
 	}
 }
+
+/*
+ * Routines for handling the GUC variable(s)
+ */
+
+typedef struct
+{
+	int			numslots;
+	NameData	slots[FLEXIBLE_ARRAY_MEMBER];
+} synchronize_slot_names_extra;
+
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+	char	   *rawname;
+	List	   *namelist;
+	ListCell   *lc;
+	synchronize_slot_names_extra *myextra;
+	NameData   *slots;
+	int			numslots;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawname, ',', &namelist))
+	{
+		/* syntax error in name list */
+		GUC_check_errdetail("List syntax is invalid.");
+		pfree(rawname);
+		list_free(namelist);
+		return false;
+	}
+
+	/* temporary workspace until we are done verifying the list */
+	slots = (NameData *) palloc(list_length(namelist) * sizeof(NameData));
+	numslots = 0;
+	foreach(lc, namelist)
+	{
+		char	   *curname = (char *) lfirst(lc);
+
+		/* Special handling for "*" which means all. */
+		if (strcmp(curname, "*") == 0)
+		{
+			numslots = -1;
+			break;
+		}
+
+		/* For any other value, validate slot name. */
+		ReplicationSlotValidateName(curname, ERROR);
+
+		/* And add it to our array. */
+		namestrcpy(&slots[numslots++], curname);
+	}
+
+	/* Now prepare an "extra" struct for assign_temp_tablespaces */
+	myextra = malloc(offsetof(synchronize_slot_names_extra, slots) +
+					 numslots > 0 ? numslots * sizeof(NameData) : 0);
+	if (!myextra)
+		return false;
+	myextra->numslots = numslots;
+	if (numslots > 0)
+		memcpy(myextra->slots, slots, numslots * sizeof(NameData));
+	*extra = (void *) myextra;
+
+	pfree(slots);
+	pfree(rawname);
+	list_free(namelist);
+
+	return true;
+}
+
+void
+assign_synchronize_slot_names(const char *newval, void *extra)
+{
+	synchronize_slot_names_extra *myextra =
+		(synchronize_slot_names_extra *) extra;
+
+	synchronize_slot_names = myextra->slots;
+	numsynchronize_slot_names = myextra->numslots;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 95d91bf236..f0988aed1c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -103,6 +103,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>	opt_temporary
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
+%type <list>	slot_name_list slot_name_list_opt
 
 %%
 
@@ -138,13 +139,31 @@ identify_system:
 					$$ = (Node *) makeNode(IdentifySystemCmd);
 				}
 			;
+
+slot_name_list:
+			IDENT
+				{
+					$$ = list_make1($1);
+				}
+			| slot_name_list ',' IDENT
+				{
+					$$ = lappend($1, $3);
+				}
+
+slot_name_list_opt:
+			slot_name_list			{ $$ = $1; }
+			| /* EMPTY */			{ $$ = NIL; }
+		;
+
 /*
  * LIST_SLOTS
  */
 list_slots:
-			K_LIST_SLOTS
+			K_LIST_SLOTS slot_name_list_opt
 				{
-					$$ = (Node *) makeNode(ListSlotsCmd);
+					ListSlotsCmd *cmd = makeNode(ListSlotsCmd);
+					cmd->slot_names = $2;
+					$$ = (Node *) cmd;
 				}
 			;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 6ca304b8f5..bb9cec46a8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -426,16 +426,41 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+	return strncmp(NameStr(*(Name)a), NameStr(*(Name)b), NAMEDATALEN);
+}
 /*
  * Handle the LIST_SLOTS command.
  */
 static void
-ListSlots(void)
+ListSlots(ListSlotsCmd *cmd)
 {
 	DestReceiver *dest;
 	TupOutputState *tstate;
 	TupleDesc	tupdesc;
 	int			slotno;
+	NameData   *slot_names;
+	int			numslot_names;
+
+	numslot_names = list_length(cmd->slot_names);
+	if (numslot_names)
+	{
+		ListCell *lc;
+		int		  i = 0;
+
+		slot_names = palloc(numslot_names * sizeof(NameData));
+		foreach (lc, cmd->slot_names)
+		{
+			char *slot_name = lfirst(lc);
+
+			ReplicationSlotValidateName(slot_name, ERROR);
+			namestrcpy(&slot_names[i++], slot_name);
+		}
+
+		qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp);
+	}
 
 	dest = CreateDestReceiver(DestRemoteSimple);
 
@@ -501,6 +526,11 @@ ListSlots(void)
 
 		SpinLockRelease(&slot->mutex);
 
+		if (numslot_names &&
+			!bsearch((void *) &slot_name, (void *) slot_names,
+					 numslot_names, sizeof(NameData), pg_qsort_namecmp))
+			continue;
+
 		memset(nulls, 0, sizeof(nulls));
 
 		i = 0;
@@ -1712,7 +1742,10 @@ exec_replication_command(const char *cmd_string)
 			break;
 
 		case T_ListSlotsCmd:
-			ListSlots();
+			{
+				ListSlotsCmd *cmd = (ListSlotsCmd *) cmd_node;
+				ListSlots(cmd);
+			}
 			break;
 
 		case T_SQLCmd:
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6fe1939881..338232920b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -64,6 +64,7 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
@@ -4078,6 +4079,17 @@ static struct config_string ConfigureNamesString[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the names of replication slots which to synchronize from primary to standby."),
+			gettext_noop("Value of \"*\" means all."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&synchronize_slot_names_string,
+		"",
+		check_synchronize_slot_names, assign_synchronize_slot_names, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1101f71bcc..71f8f8ca8e 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -40,6 +40,7 @@ typedef struct IdentifySystemCmd
 typedef struct ListSlotsCmd
 {
 	NodeTag		type;
+	List	   *slot_names;
 } ListSlotsCmd;
 
 /* ----------------------
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 6379aae7ce..1b09a1fb3e 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -12,8 +12,17 @@
 #ifndef LOGICALWORKER_H
 #define LOGICALWORKER_H
 
+#include "utils/guc.h"
+
+extern char *synchronize_slot_names_string;
+
 extern void ApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern bool check_synchronize_slot_names(char **newval, void **extra, GucSource source);
+extern void assign_synchronize_slot_names(const char *newval, void *extra);
+
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 9014f92bfa..2593af2291 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -219,7 +219,7 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli,
 											int *server_version);
-typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn);
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, int nslots, NameData *slots);
 typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
 												   TimeLineID tli,
 												   char **filename,
@@ -272,8 +272,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli, server_version) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
-#define walrcv_list_slots(conn) \
-	WalReceiverFunctions->walrcv_list_slots(conn)
+#define walrcv_list_slots(conn, nslots, slots) \
+	WalReceiverFunctions->walrcv_list_slots(conn, nslots, slots)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
 	WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
 #define walrcv_startstreaming(conn, options) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index ba2d3f1fca..c2373fea47 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -68,6 +68,10 @@ extern LogicalRepWorker *MyLogicalRepWorker;
 
 extern bool in_remote_transaction;
 
+/* Translated Gucs we share with launcher and worker. */
+extern NameData *synchronize_slot_names;
+extern int numsynchronize_slot_names;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
 					   bool only_running);
-- 
2.17.1

From 87fa57753cb667ebcabee48b3fead835de72d6e1 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sun, 30 Dec 2018 02:07:12 +0100
Subject: [PATCH 1/2] Synchronize logical replication slots from primary to
 standby

---
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/bgworker.c             |   3 +
 src/backend/postmaster/pgstat.c               |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  74 ++++
 src/backend/replication/logical/Makefile      |   3 +-
 src/backend/replication/logical/launcher.c    | 207 ++++++++----
 src/backend/replication/logical/slotsync.c    | 316 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  12 +-
 src/backend/replication/repl_gram.y           |  13 +-
 src/backend/replication/repl_scanner.l        |   1 +
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           | 164 +++++++++
 src/include/nodes/nodes.h                     |   1 +
 src/include/nodes/replnodes.h                 |   8 +
 src/include/pgstat.h                          |   1 +
 src/include/replication/slot.h                |   3 +
 src/include/replication/walreceiver.h         |  12 +
 src/include/replication/worker_internal.h     |   8 +-
 18 files changed, 764 insertions(+), 71 deletions(-)
 create mode 100644 src/backend/replication/logical/slotsync.c

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9021463a4c..bec6d11457 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -598,7 +598,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			logicalrep_worker_stop_at_commit(MyDatabaseId, sub->oid, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -940,7 +940,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
+		logicalrep_worker_stop(w->dbid, w->subid, w->relid);
 	}
 	list_free(subworkers);
 
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index d2b695e146..3b2a5d04cd 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"ReplSlotSyncMain", ReplSlotSyncMain
 	}
 };
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8676088e57..5387f4f2fa 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3494,6 +3494,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
 			event_name = "LogicalLauncherMain";
 			break;
+		case WAIT_EVENT_REPL_SLOT_SYNC_MAIN:
+			event_name = "ReplSlotSyncMain";
+			break;
 		case WAIT_EVENT_PGSTAT_MAIN:
 			event_name = "PgStatMain";
 			break;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9b75711ebd..3c77a76ea6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,6 +23,7 @@
 #include "pqexpbuffer.h"
 #include "access/xlog.h"
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 						 TimeLineID *primary_tli,
 						 int *server_version);
+static List *libpqrcv_list_slots(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 								 TimeLineID tli, char **filename,
 								 char **content, int *len);
@@ -86,6 +88,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_get_conninfo,
 	libpqrcv_get_senderinfo,
 	libpqrcv_identify_system,
+	libpqrcv_list_slots,
 	libpqrcv_readtimelinehistoryfile,
 	libpqrcv_startstreaming,
 	libpqrcv_endstreaming,
@@ -348,6 +351,77 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli,
 	return primary_sysid;
 }
 
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn)
+{
+	PGresult   *res;
+	int			i;
+	List	   *slots = NIL;
+	int			ntuples;
+	WalRecvReplicationSlotData *slot_data;
+
+	res = libpqrcv_PQexec(conn->streamConn, "LIST_SLOTS");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not receive list of slots the primary server: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	}
+	if (PQnfields(res) < 10)
+	{
+		int			nfields = PQnfields(res);
+
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("invalid response from primary server"),
+				 errdetail("Could not get list of slots: got %d fields, expected %d or more fields.",
+						   nfields, 10)));
+	}
+
+	ntuples = PQntuples(res);
+	for (i = 0; i < ntuples; i++)
+	{
+		char   *slot_type;
+
+		slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+		namestrcpy(&slot_data->name, PQgetvalue(res, i, 0));
+		if (!PQgetisnull(res, i, 1))
+			namestrcpy(&slot_data->plugin, PQgetvalue(res, i, 1));
+		slot_type = PQgetvalue(res, i, 2);
+		if (!PQgetisnull(res, i, 3))
+			slot_data->database = atooid(PQgetvalue(res, i, 3));
+		if (strcmp(slot_type, "physical") == 0)
+		{
+			if (OidIsValid(slot_data->database))
+				elog(ERROR, "unexpected physical replication slot with database set");
+		}
+		if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+			slot_data->persistency = RS_TEMPORARY;
+		else
+			slot_data->persistency = RS_PERSISTENT;
+		if (!PQgetisnull(res, i, 6))
+			slot_data->xmin = atooid(PQgetvalue(res, i, 6));
+		if (!PQgetisnull(res, i, 7))
+			slot_data->catalog_xmin = atooid(PQgetvalue(res, i, 7));
+		if (!PQgetisnull(res, i, 8))
+			slot_data->restart_lsn = pg_strtouint64(PQgetvalue(res, i, 8),
+													NULL, 10);
+		if (!PQgetisnull(res, i, 9))
+			slot_data->confirmed_flush = pg_strtouint64(PQgetvalue(res, i, 9),
+														NULL, 10);
+
+		slots = lappend(slots, slot_data);
+	}
+
+	PQclear(res);
+
+	return slots;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index bb417b042e..385782bd67 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = decode.o launcher.o logical.o logicalfuncs.o message.o origin.o \
-	   proto.o relation.o reorderbuffer.o snapbuild.o tablesync.o worker.o
+	   proto.o relation.o reorderbuffer.o slotsync.o snapbuild.o \
+	   tablesync.o worker.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3a84d8ca86..c47d4ee403 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -26,6 +26,7 @@
 #include "access/htup_details.h"
 #include "access/xact.h"
 
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 
@@ -75,6 +76,7 @@ LogicalRepCtxStruct *LogicalRepCtx;
 
 typedef struct LogicalRepWorkerId
 {
+	Oid			dbid;
 	Oid			subid;
 	Oid			relid;
 } LogicalRepWorkerId;
@@ -239,7 +241,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * subscription id and relid.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
 {
 	int			i;
 	LogicalRepWorker *res = NULL;
@@ -251,8 +253,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->in_use && w->subid == subid && w->relid == relid &&
-			(!only_running || w->proc))
+		if (w->in_use && w->dbid == dbid && w->subid == subid &&
+			w->relid == relid && (!only_running || w->proc))
 		{
 			res = w;
 			break;
@@ -302,9 +304,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	TimestampTz now;
 
-	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+	if (OidIsValid(subid))
+		ereport(DEBUG1,
+				(errmsg("starting logical replication worker for subscription \"%s\"",
+						subname)));
+	else
+		ereport(DEBUG1,
+				(errmsg("starting replication slot synchronization worker")));
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -341,7 +347,9 @@ retry:
 	 * reason we do this is because if some worker failed to start up and its
 	 * parent has crashed while waiting, the in_use state was never cleared.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL ||
+		(OidIsValid(relid) &&
+		 nsyncworkers >= max_sync_workers_per_subscription))
 	{
 		bool		did_cleanup = false;
 
@@ -375,7 +383,7 @@ retry:
 	 * silently as we might get here because of an otherwise harmless race
 	 * condition.
 	 */
-	if (nsyncworkers >= max_sync_workers_per_subscription)
+	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return;
@@ -421,15 +429,22 @@ retry:
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
-	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+	if (OidIsValid(subid))
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+	else
+		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
 	if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication worker for subscription %u sync %u", subid, relid);
-	else
+	else if (OidIsValid(subid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication worker for subscription %u", subid);
+	else
+		snprintf(bgw.bgw_name, BGW_MAXLEN,
+				 "replication slot synchronization worker");
+
 	snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
@@ -460,14 +475,14 @@ retry:
  * it detaches from the slot.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 	uint16		generation;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, false);
+	worker = logicalrep_worker_find(dbid, subid, relid, false);
 
 	/* No worker, nothing to do. */
 	if (!worker)
@@ -557,7 +572,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
  * Request worker for specified sub/rel to be stopped on commit.
  */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_worker_stop_at_commit(Oid dbid, Oid subid, Oid relid)
 {
 	int			nestDepth = GetCurrentTransactionNestLevel();
 	LogicalRepWorkerId *wid;
@@ -590,6 +605,7 @@ logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 	 * subtransaction.
 	 */
 	wid = palloc(sizeof(LogicalRepWorkerId));
+	wid->dbid = dbid;
 	wid->subid = subid;
 	wid->relid = relid;
 	on_commit_stop_workers->workers =
@@ -602,13 +618,13 @@ logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, true);
+	worker = logicalrep_worker_find(dbid, subid, relid, true);
 
 	if (worker)
 		logicalrep_worker_wakeup_ptr(worker);
@@ -795,7 +811,7 @@ ApplyLauncherRegister(void)
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 	snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -873,7 +889,7 @@ AtEOXact_ApplyLauncher(bool isCommit)
 			{
 				LogicalRepWorkerId *wid = lfirst(lc);
 
-				logicalrep_worker_stop(wid->subid, wid->relid);
+				logicalrep_worker_stop(wid->dbid, wid->subid, wid->relid);
 			}
 		}
 
@@ -964,6 +980,115 @@ ApplyLauncherWakeup(void)
 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+static void
+ApplyLauncherStartSlotSync(TimestampTz *last_start_time, long *wait_time)
+{
+	TimestampTz now;
+	char	   *err;
+	List	   *slots;
+	ListCell   *lc;
+	MemoryContext tmpctx;
+	MemoryContext oldctx;
+
+	if (numsynchronize_slot_names == 0)
+		return;
+
+	wrconn = walrcv_connect(PrimaryConnInfo, false,
+							"Logical Replication Launcher", &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	/* Use temporary context for the slot list and worker info. */
+	tmpctx = AllocSetContextCreate(TopMemoryContext,
+									"Logical Replication Launcher slot sync ctx",
+									ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(tmpctx);
+
+	slots = walrcv_list_slots(wrconn);
+
+	now = GetCurrentTimestamp();
+
+	foreach (lc, slots)
+	{
+		WalRecvReplicationSlotData *slot_data = lfirst(lc);
+		LogicalRepWorker *w;
+
+		if (!OidIsValid(slot_data->database))
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(slot_data->database, InvalidOid,
+								   InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w == NULL)
+		{
+			*last_start_time = now;
+			*wait_time = wal_retrieve_retry_interval;
+
+			logicalrep_worker_launch(slot_data->database, InvalidOid, NULL,
+									 BOOTSTRAP_SUPERUSERID, InvalidOid);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(tmpctx);
+
+	walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(TimestampTz *last_start_time, long *wait_time)
+{
+	TimestampTz now;
+	List	   *sublist;
+	ListCell   *lc;
+	MemoryContext subctx;
+	MemoryContext oldctx;
+
+	now = GetCurrentTimestamp();
+
+	/* Use temporary context for the database list and worker info. */
+	subctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher sublist",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(subctx);
+
+	/* search for subscriptions to start or stop. */
+	sublist = get_subscription_list();
+
+	/* Start the missing workers for enabled subscriptions. */
+	foreach(lc, sublist)
+	{
+		Subscription *sub = (Subscription *) lfirst(lc);
+		LogicalRepWorker *w;
+
+		if (!sub->enabled)
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w == NULL)
+		{
+			*last_start_time = now;
+			*wait_time = wal_retrieve_retry_interval;
+
+			logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+									 sub->owner, InvalidOid);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -991,14 +1116,12 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	load_file("libpqwalreceiver", false);
+
 	/* Enter main loop */
 	for (;;)
 	{
 		int			rc;
-		List	   *sublist;
-		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -1010,42 +1133,10 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
-			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
-
-				if (!sub->enabled)
-					continue;
-
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
-
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
-
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
-				}
-			}
-
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			if (!RecoveryInProgress())
+				ApplyLauncherStartSubs(&last_start_time, &wait_time);
+			else
+				ApplyLauncherStartSlotSync(&last_start_time, &wait_time);
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..eda9ad0add
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,316 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *	   PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "catalog/pg_type_d.h"
+#include "commands/dbcommands.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicalworker.h"
+#include "replication/slot.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+							  XLogRecPtr min_lsn)
+{
+	WalRcvExecResult *res;
+	TupleTableSlot *slot;
+	Oid				slotRow[1] = {LSNOID};
+	StringInfoData	cmd;
+	bool			isnull;
+	XLogRecPtr		restart_lsn;
+
+	for (;;)
+	{
+		int	rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT restart_lsn"
+						 "  FROM pg_catalog.pg_replication_slots"
+						 " WHERE slot_name = %s",
+						 quote_literal_cstr(slot_name));
+		res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
+							slot_name, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+			ereport(ERROR,
+					(errmsg("slot \"%s\" disapeared from provider",
+							slot_name)));
+
+		restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+		walrcv_clear_result(res);
+
+		if (restart_lsn >= min_lsn)
+			break;
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+					 char *plugin_name, XLogRecPtr target_lsn)
+{
+	int			i;
+	bool		found = false;
+	XLogRecPtr	endlsn;
+
+	/* Search for the named slot and mark it active if we find it. */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (strcmp(NameStr(s->data.name), slot_name) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	StartTransactionCommand();
+
+	/* Already existing slot, acquire */
+	if (found)
+	{
+		ReplicationSlotAcquire(slot_name, true);
+
+		if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+		{
+			elog(DEBUG1,
+				 "not synchronizing slot %s; synchronization would move it backward",
+				 slot_name);
+
+			ReplicationSlotRelease();
+			CommitTransactionCommand();
+			return;
+		}
+	}
+	/* Otherwise create the slot first. */
+	else
+	{
+		TransactionId xmin_horizon = InvalidTransactionId;
+		ReplicationSlot	   *slot;
+
+		ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL);
+		slot = MyReplicationSlot;
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.database = get_database_oid(database, false);
+		StrNCpy(NameStr(slot->data.plugin), plugin_name, NAMEDATALEN);
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotReserveWal();
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+		slot->effective_catalog_xmin = xmin_horizon;
+		slot->data.catalog_xmin = xmin_horizon;
+		ReplicationSlotsComputeRequiredXmin(true);
+		LWLockRelease(ProcArrayLock);
+
+		if (target_lsn < MyReplicationSlot->data.restart_lsn)
+		{
+			elog(LOG, "waiting for remote slot %s lsn (%X/%X) to pass local slot lsn (%X/%X)",
+				 slot_name,
+				 (uint32) (target_lsn >> 32),
+				 (uint32) (target_lsn),
+				 (uint32) (MyReplicationSlot->data.restart_lsn >> 32),
+				 (uint32) (MyReplicationSlot->data.restart_lsn));
+
+			wait_for_primary_slot_catchup(wrconn, slot_name,
+										  MyReplicationSlot->data.restart_lsn);
+		}
+
+		ReplicationSlotPersist();
+	}
+
+	endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+	elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+		 slot_name, (uint32) (endlsn >> 32), (uint32) (endlsn));
+
+	ReplicationSlotRelease();
+	CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+	WalRcvExecResult *res;
+	WalReceiverConn *wrconn = NULL;
+	TupleTableSlot *slot;
+	Oid				slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+	StringInfoData	s;
+	char		   *database;
+	char		   *err;
+	MemoryContext	oldctx = CurrentMemoryContext;
+
+	if (!WalRcv)
+		return;
+
+	/* syscache access needs a transaction env. */
+	StartTransactionCommand();
+	/* make dbname live outside TX context */
+	MemoryContextSwitchTo(oldctx);
+
+	database = get_database_name(MyDatabaseId);
+	initStringInfo(&s);
+	appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+	wrconn = walrcv_connect(s.data, true, "slot_sync", &err);
+
+	if (wrconn == NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	resetStringInfo(&s);
+	/* TODO filter slot names? */
+	appendStringInfo(&s,
+					 "SELECT slot_name, plugin, confirmed_flush_lsn"
+					 "  FROM pg_catalog.pg_replication_slots"
+					 " WHERE database = %s",
+					 quote_literal_cstr(database));
+	res = walrcv_exec(wrconn, s.data, 3, slotRow);
+	pfree(s.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch slot info from primary: %s",
+						res->err)));
+
+	CommitTransactionCommand();
+	/* CommitTransactionCommand switches to TopMemoryContext */
+	MemoryContextSwitchTo(oldctx);
+
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *slot_name;
+		char	   *plugin_name;
+		XLogRecPtr	confirmed_flush_lsn;
+		bool		isnull;
+
+		slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		plugin_name = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+							 confirmed_flush_lsn);
+
+		ExecClearTuple(slot);
+	}
+
+	walrcv_clear_result(res);
+	pfree(database);
+
+	walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+
+	/* Attach to slot */
+	logicalrep_worker_attach(worker_slot);
+
+	/* Establish signal handlers. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
+	/* Connect to our database. */
+	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+											  MyLogicalRepWorker->userid,
+											  0);
+
+	StartTransactionCommand();
+	ereport(LOG,
+			(errmsg("replication slot synchronization worker for database \"%s\" has started",
+					get_database_name(MyLogicalRepWorker->dbid))));
+	CommitTransactionCommand();
+
+	/* Main wait loop. */
+	for (;;)
+	{
+		int		rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (!RecoveryInProgress())
+			return;
+
+		synchronize_slots();
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38ae1b9ab8..816468a782 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -141,7 +141,8 @@ finish_sync_worker(void)
 	CommitTransactionCommand();
 
 	/* Find the main apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+	logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+							 MyLogicalRepWorker->subid, InvalidOid);
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -184,7 +185,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 		/* Check if the opposite worker is still running and bail if not. */
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid,
 										am_tablesync_worker() ? InvalidOid : relid,
 										false);
 		LWLockRelease(LogicalRepWorkerLock);
@@ -232,7 +234,8 @@ wait_for_worker_state_change(char expected_state)
 		 * waiting.
 		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+										MyLogicalRepWorker->subid,
 										InvalidOid, false);
 		if (worker && worker->proc)
 			logicalrep_worker_wakeup_ptr(worker);
@@ -432,7 +435,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 */
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+			syncworker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+												MyLogicalRepWorker->subid,
 												rstate->relid, false);
 
 			if (syncworker)
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 843a878ff3..95d91bf236 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,11 +87,12 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_LIST_SLOTS
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				timeline_history show sql_cmd list_slots
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -124,6 +125,7 @@ command:
 			| drop_replication_slot
 			| timeline_history
 			| show
+			| list_slots
 			| sql_cmd
 			;
 
@@ -136,6 +138,15 @@ identify_system:
 					$$ = (Node *) makeNode(IdentifySystemCmd);
 				}
 			;
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+			K_LIST_SLOTS
+				{
+					$$ = (Node *) makeNode(ListSlotsCmd);
+				}
+			;
 
 /*
  * SHOW setting
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 7bb501f594..2d8accd985 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+LIST_SLOTS		{ return K_LIST_SLOTS; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad4a2..729935fc60 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -351,7 +351,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  */
-static XLogRecPtr
+XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d1a8113cb6..6ca304b8f5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -426,6 +426,166 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(void)
+{
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	int			slotno;
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+
+	/* need a tuple descriptor representing four columns */
+	tupdesc = CreateTemplateTupleDesc(10);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+							  INT8OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+							  TEXTOID, -1, 0);
+
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (slotno = 0; slotno < max_replication_slots; slotno++)
+	{
+		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+		char		restart_lsn_str[MAXFNAMELEN];
+		char		confirmed_flush_lsn_str[MAXFNAMELEN];
+		Datum		values[10];
+		bool		nulls[10];
+
+		ReplicationSlotPersistency persistency;
+		TransactionId xmin;
+		TransactionId catalog_xmin;
+		XLogRecPtr	restart_lsn;
+		XLogRecPtr	confirmed_flush_lsn;
+		Oid			datoid;
+		NameData	slot_name;
+		NameData	plugin;
+		int			i;
+		int64		tmpbigint;
+
+		if (!slot->in_use)
+			continue;
+
+		SpinLockAcquire(&slot->mutex);
+
+		xmin = slot->data.xmin;
+		catalog_xmin = slot->data.catalog_xmin;
+		datoid = slot->data.database;
+		restart_lsn = slot->data.restart_lsn;
+		confirmed_flush_lsn = slot->data.confirmed_flush;
+		namecpy(&slot_name, &slot->data.name);
+		namecpy(&plugin, &slot->data.plugin);
+		persistency = slot->data.persistency;
+
+		SpinLockRelease(&slot->mutex);
+
+		memset(nulls, 0, sizeof(nulls));
+
+		i = 0;
+		values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+			values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+		if (datoid == InvalidOid)
+			values[i++] = CStringGetTextDatum("physical");
+		else
+			values[i++] = CStringGetTextDatum("logical");
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			tmpbigint = datoid;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+
+		if (datoid == InvalidOid)
+			nulls[i++] = true;
+		else
+		{
+			MemoryContext cur = CurrentMemoryContext;
+
+			/* syscache access needs a transaction env. */
+			StartTransactionCommand();
+			/* make dbname live outside TX context */
+			MemoryContextSwitchTo(cur);
+			values[i++] = CStringGetTextDatum(get_database_name(datoid));
+			CommitTransactionCommand();
+			/* CommitTransactionCommand switches to TopMemoryContext */
+			MemoryContextSwitchTo(cur);
+		}
+
+		values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0);
+
+		if (xmin != InvalidTransactionId)
+		{
+			tmpbigint = xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (catalog_xmin != InvalidTransactionId)
+		{
+			tmpbigint = catalog_xmin;
+			values[i++] = Int64GetDatum(tmpbigint);
+		}
+		else
+			nulls[i++] = true;
+
+		if (restart_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X",
+					 (uint32) (restart_lsn >> 32),
+					 (uint32) restart_lsn);
+			values[i++] = CStringGetTextDatum(restart_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		if (confirmed_flush_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str),
+					 "%X/%X",
+					 (uint32) (confirmed_flush_lsn >> 32),
+					 (uint32) confirmed_flush_lsn);
+			values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str);
+		}
+		else
+			nulls[i++] = true;
+
+		/* send it to dest */
+		do_tup_output(tstate, values, nulls);
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	end_tup_output(tstate);
+}
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1551,6 +1711,10 @@ exec_replication_command(const char *cmd_string)
 			}
 			break;
 
+		case T_ListSlotsCmd:
+			ListSlots();
+			break;
+
 		case T_SQLCmd:
 			if (MyDatabaseId == InvalidOid)
 				ereport(ERROR,
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index cac6ff0eda..a773acab79 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -486,6 +486,7 @@ typedef enum NodeTag
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
 	T_SQLCmd,
+	T_ListSlotsCmd,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 66c948d85e..1101f71bcc 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,14 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+	NodeTag		type;
+} ListSlotsCmd;
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f1c10d16b8..52bdee746e 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -761,6 +761,7 @@ typedef enum
 	WAIT_EVENT_CHECKPOINTER_MAIN,
 	WAIT_EVENT_LOGICAL_APPLY_MAIN,
 	WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
+	WAIT_EVENT_REPL_SLOT_SYNC_MAIN,
 	WAIT_EVENT_PGSTAT_MAIN,
 	WAIT_EVENT_RECOVERY_WAL_ALL,
 	WAIT_EVENT_RECOVERY_WAL_STREAM,
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7964ae254f..1a0fd44876 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -205,4 +205,7 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
 #endif							/* SLOT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 5913b580c2..9014f92bfa 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -17,6 +17,7 @@
 #include "fmgr.h"
 #include "getaddrinfo.h"		/* for NI_MAXHOST */
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/latch.h"
 #include "storage/spin.h"
@@ -167,6 +168,13 @@ typedef struct
 	}			proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently this is same as ReplicationSlotPersistentData
+ */
+#define WalRecvReplicationSlotData ReplicationSlotPersistentData
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -211,6 +219,7 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli,
 											int *server_version);
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn);
 typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
 												   TimeLineID tli,
 												   char **filename,
@@ -240,6 +249,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
+	walrcv_list_slots_fn walrcv_list_slots;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
 	walrcv_endstreaming_fn walrcv_endstreaming;
@@ -262,6 +272,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli, server_version) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
+#define walrcv_list_slots(conn) \
+	WalReceiverFunctions->walrcv_list_slots(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
 	WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
 #define walrcv_startstreaming(conn, options) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index ef079111cd..ba2d3f1fca 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -69,14 +69,14 @@ extern LogicalRepWorker *MyLogicalRepWorker;
 extern bool in_remote_transaction;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
 					   bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid dbid, Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
-- 
2.17.1

Reply via email to