Hi,

Logical replication has been used to migration with minimal downtime. However,
if you are dealing with a big database, the amount of required resources (disk
-- due to WAL retention) increases as the backlog (WAL) increases. Unless you
have a generous amount of resources and can wait for long period of time until
the new replica catches up, creating a logical replica is impracticable on
large databases.

The general idea is to create and convert a physical replica or a base backup
(archived WAL files available) into a logical replica. The initial data copy
and catchup tends to be faster on a physical replica. This technique has been
successfully used in pglogical_create_subscriber [1].

A new tool called pg_subscriber does this conversion and is tightly integrated
with Postgres.

DESIGN

The conversion requires 8 steps.

1. Check if the target data directory has the same system identifier than the
source data directory.
2. Stop the target server if it is running as a standby server. (Modify
recovery parameters requires a restart.)
3. Create one replication slot per specified database on the source server. One
additional replication slot is created at the end to get the consistent LSN
(This consistent LSN will be used as (a) a stopping point for the recovery
process and (b) a starting point for the subscriptions).
4. Write recovery parameters into the target data directory and start the
target server (Wait until the target server is promoted).
5. Create one publication (FOR ALL TABLES) per specified database on the source
server.
6. Create one subscription per specified database on the target server (Use
replication slot and publication created in a previous step. Don't enable the
subscriptions yet).
7. Sets the replication progress to the consistent LSN that was got in a
previous step.
8. Enable the subscription for each specified database on the target server.

This tool does not take a base backup. It can certainly be included later.
There is already a tool do it: pg_basebackup.

There is a --subscriber-conninfo option to inform the subscriber connection
string, however, we could remove it since this tool runs on the subscriber and
we can build a connection string.

NAME

I'm not sure about the proposed name. I came up with this one because it is not
so long. The last added tools uses pg_ prefix, verb (action) and object.
pg_initsubscriber and pg_createsubscriber are names that I thought but I'm not
excited about it.

DOCUMENTATION

It is available and describes this tool.

TESTS

Basic tests are included. It requires some tests to exercise this tool.


Comments?

[1] https://github.com/2ndQuadrant/pglogical


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 5827245b8a06f906f603100c8fb27be533a6c0a1 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Fri, 11 Feb 2022 03:05:58 -0300
Subject: [PATCH v1 1/2] Move readfile() and free_readfile() to file_utils.h

Allow these functions to be used by other binaries.

There is a static function called readfile() in initdb.c too. Rename it
to avoid conflicting with the exposed function.
---
 src/bin/initdb/initdb.c         |  26 +++----
 src/bin/pg_ctl/pg_ctl.c         | 122 +-------------------------------
 src/common/file_utils.c         | 119 +++++++++++++++++++++++++++++++
 src/include/common/file_utils.h |   3 +
 4 files changed, 136 insertions(+), 134 deletions(-)

diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 97f15971e2..a4cbfeb954 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -243,8 +243,8 @@ static char **replace_token(char **lines,
 #ifndef HAVE_UNIX_SOCKETS
 static char **filter_lines_with_token(char **lines, const char *token);
 #endif
-static char **readfile(const char *path);
-static void writefile(char *path, char **lines);
+static char **read_text_file(const char *path);
+static void write_text_file(char *path, char **lines);
 static FILE *popen_check(const char *command, const char *mode);
 static char *get_id(void);
 static int	get_encoding_id(const char *encoding_name);
@@ -453,7 +453,7 @@ filter_lines_with_token(char **lines, const char *token)
  * get the lines from a text file
  */
 static char **
-readfile(const char *path)
+read_text_file(const char *path)
 {
 	char	  **result;
 	FILE	   *infile;
@@ -500,7 +500,7 @@ readfile(const char *path)
  * so that the resulting configuration files are nicely editable on Windows.
  */
 static void
-writefile(char *path, char **lines)
+write_text_file(char *path, char **lines)
 {
 	FILE	   *out_file;
 	char	  **line;
@@ -1063,7 +1063,7 @@ setup_config(void)
 
 	/* postgresql.conf */
 
-	conflines = readfile(conf_file);
+	conflines = read_text_file(conf_file);
 
 	snprintf(repltok, sizeof(repltok), "max_connections = %d", n_connections);
 	conflines = replace_token(conflines, "#max_connections = 100", repltok);
@@ -1214,7 +1214,7 @@ setup_config(void)
 
 	snprintf(path, sizeof(path), "%s/postgresql.conf", pg_data);
 
-	writefile(path, conflines);
+	write_text_file(path, conflines);
 	if (chmod(path, pg_file_create_mode) != 0)
 	{
 		pg_log_error("could not change permissions of \"%s\": %m", path);
@@ -1233,7 +1233,7 @@ setup_config(void)
 
 	sprintf(path, "%s/postgresql.auto.conf", pg_data);
 
-	writefile(path, autoconflines);
+	write_text_file(path, autoconflines);
 	if (chmod(path, pg_file_create_mode) != 0)
 	{
 		pg_log_error("could not change permissions of \"%s\": %m", path);
@@ -1245,7 +1245,7 @@ setup_config(void)
 
 	/* pg_hba.conf */
 
-	conflines = readfile(hba_file);
+	conflines = read_text_file(hba_file);
 
 #ifndef HAVE_UNIX_SOCKETS
 	conflines = filter_lines_with_token(conflines, "@remove-line-for-nolocal@");
@@ -1319,7 +1319,7 @@ setup_config(void)
 
 	snprintf(path, sizeof(path), "%s/pg_hba.conf", pg_data);
 
-	writefile(path, conflines);
+	write_text_file(path, conflines);
 	if (chmod(path, pg_file_create_mode) != 0)
 	{
 		pg_log_error("could not change permissions of \"%s\": %m", path);
@@ -1330,11 +1330,11 @@ setup_config(void)
 
 	/* pg_ident.conf */
 
-	conflines = readfile(ident_file);
+	conflines = read_text_file(ident_file);
 
 	snprintf(path, sizeof(path), "%s/pg_ident.conf", pg_data);
 
-	writefile(path, conflines);
+	write_text_file(path, conflines);
 	if (chmod(path, pg_file_create_mode) != 0)
 	{
 		pg_log_error("could not change permissions of \"%s\": %m", path);
@@ -1362,7 +1362,7 @@ bootstrap_template1(void)
 	printf(_("running bootstrap script ... "));
 	fflush(stdout);
 
-	bki_lines = readfile(bki_file);
+	bki_lines = read_text_file(bki_file);
 
 	/* Check that bki file appears to be of the right version */
 
@@ -1547,7 +1547,7 @@ setup_run_file(FILE *cmdfd, const char *filename)
 {
 	char	  **lines;
 
-	lines = readfile(filename);
+	lines = read_text_file(filename);
 
 	for (char **line = lines; *line != NULL; line++)
 	{
diff --git a/src/bin/pg_ctl/pg_ctl.c b/src/bin/pg_ctl/pg_ctl.c
index 3c182c97d4..b87beb7380 100644
--- a/src/bin/pg_ctl/pg_ctl.c
+++ b/src/bin/pg_ctl/pg_ctl.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_control.h"
 #include "common/controldata_utils.h"
 #include "common/file_perm.h"
+#include "common/file_utils.h"
 #include "common/logging.h"
 #include "common/string.h"
 #include "getopt_long.h"
@@ -150,8 +151,6 @@ static PTOKEN_PRIVILEGES GetPrivilegesToDelete(HANDLE hToken);
 #endif
 
 static pgpid_t get_pgpid(bool is_status_request);
-static char **readfile(const char *path, int *numlines);
-static void free_readfile(char **optlines);
 static pgpid_t start_postmaster(void);
 static void read_post_opts(void);
 
@@ -307,125 +306,6 @@ get_pgpid(bool is_status_request)
 }
 
 
-/*
- * get the lines from a text file - return NULL if file can't be opened
- *
- * Trailing newlines are deleted from the lines (this is a change from pre-v10)
- *
- * *numlines is set to the number of line pointers returned; there is
- * also an additional NULL pointer after the last real line.
- */
-static char **
-readfile(const char *path, int *numlines)
-{
-	int			fd;
-	int			nlines;
-	char	  **result;
-	char	   *buffer;
-	char	   *linebegin;
-	int			i;
-	int			n;
-	int			len;
-	struct stat statbuf;
-
-	*numlines = 0;				/* in case of failure or empty file */
-
-	/*
-	 * Slurp the file into memory.
-	 *
-	 * The file can change concurrently, so we read the whole file into memory
-	 * with a single read() call. That's not guaranteed to get an atomic
-	 * snapshot, but in practice, for a small file, it's close enough for the
-	 * current use.
-	 */
-	fd = open(path, O_RDONLY | PG_BINARY, 0);
-	if (fd < 0)
-		return NULL;
-	if (fstat(fd, &statbuf) < 0)
-	{
-		close(fd);
-		return NULL;
-	}
-	if (statbuf.st_size == 0)
-	{
-		/* empty file */
-		close(fd);
-		result = (char **) pg_malloc(sizeof(char *));
-		*result = NULL;
-		return result;
-	}
-	buffer = pg_malloc(statbuf.st_size + 1);
-
-	len = read(fd, buffer, statbuf.st_size + 1);
-	close(fd);
-	if (len != statbuf.st_size)
-	{
-		/* oops, the file size changed between fstat and read */
-		free(buffer);
-		return NULL;
-	}
-
-	/*
-	 * Count newlines. We expect there to be a newline after each full line,
-	 * including one at the end of file. If there isn't a newline at the end,
-	 * any characters after the last newline will be ignored.
-	 */
-	nlines = 0;
-	for (i = 0; i < len; i++)
-	{
-		if (buffer[i] == '\n')
-			nlines++;
-	}
-
-	/* set up the result buffer */
-	result = (char **) pg_malloc((nlines + 1) * sizeof(char *));
-	*numlines = nlines;
-
-	/* now split the buffer into lines */
-	linebegin = buffer;
-	n = 0;
-	for (i = 0; i < len; i++)
-	{
-		if (buffer[i] == '\n')
-		{
-			int			slen = &buffer[i] - linebegin;
-			char	   *linebuf = pg_malloc(slen + 1);
-
-			memcpy(linebuf, linebegin, slen);
-			/* we already dropped the \n, but get rid of any \r too */
-			if (slen > 0 && linebuf[slen - 1] == '\r')
-				slen--;
-			linebuf[slen] = '\0';
-			result[n++] = linebuf;
-			linebegin = &buffer[i + 1];
-		}
-	}
-	result[n] = NULL;
-
-	free(buffer);
-
-	return result;
-}
-
-
-/*
- * Free memory allocated for optlines through readfile()
- */
-static void
-free_readfile(char **optlines)
-{
-	char	   *curr_line = NULL;
-	int			i = 0;
-
-	if (!optlines)
-		return;
-
-	while ((curr_line = optlines[i++]))
-		free(curr_line);
-
-	free(optlines);
-}
-
 /*
  * start/test/stop routines
  */
diff --git a/src/common/file_utils.c b/src/common/file_utils.c
index 7138068633..1f53f088c6 100644
--- a/src/common/file_utils.c
+++ b/src/common/file_utils.c
@@ -398,6 +398,125 @@ durable_rename(const char *oldfile, const char *newfile)
 	return 0;
 }
 
+/*
+ * get the lines from a text file - return NULL if file can't be opened
+ *
+ * Trailing newlines are deleted from the lines (this is a change from pre-v10)
+ *
+ * *numlines is set to the number of line pointers returned; there is
+ * also an additional NULL pointer after the last real line.
+ */
+char **
+readfile(const char *path, int *numlines)
+{
+	int			fd;
+	int			nlines;
+	char	  **result;
+	char	   *buffer;
+	char	   *linebegin;
+	int			i;
+	int			n;
+	int			len;
+	struct stat statbuf;
+
+	*numlines = 0;				/* in case of failure or empty file */
+
+	/*
+	 * Slurp the file into memory.
+	 *
+	 * The file can change concurrently, so we read the whole file into memory
+	 * with a single read() call. That's not guaranteed to get an atomic
+	 * snapshot, but in practice, for a small file, it's close enough for the
+	 * current use.
+	 */
+	fd = open(path, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		return NULL;
+	if (fstat(fd, &statbuf) < 0)
+	{
+		close(fd);
+		return NULL;
+	}
+	if (statbuf.st_size == 0)
+	{
+		/* empty file */
+		close(fd);
+		result = (char **) pg_malloc(sizeof(char *));
+		*result = NULL;
+		return result;
+	}
+	buffer = pg_malloc(statbuf.st_size + 1);
+
+	len = read(fd, buffer, statbuf.st_size + 1);
+	close(fd);
+	if (len != statbuf.st_size)
+	{
+		/* oops, the file size changed between fstat and read */
+		free(buffer);
+		return NULL;
+	}
+
+	/*
+	 * Count newlines. We expect there to be a newline after each full line,
+	 * including one at the end of file. If there isn't a newline at the end,
+	 * any characters after the last newline will be ignored.
+	 */
+	nlines = 0;
+	for (i = 0; i < len; i++)
+	{
+		if (buffer[i] == '\n')
+			nlines++;
+	}
+
+	/* set up the result buffer */
+	result = (char **) pg_malloc((nlines + 1) * sizeof(char *));
+	*numlines = nlines;
+
+	/* now split the buffer into lines */
+	linebegin = buffer;
+	n = 0;
+	for (i = 0; i < len; i++)
+	{
+		if (buffer[i] == '\n')
+		{
+			int			slen = &buffer[i] - linebegin;
+			char	   *linebuf = pg_malloc(slen + 1);
+
+			memcpy(linebuf, linebegin, slen);
+			/* we already dropped the \n, but get rid of any \r too */
+			if (slen > 0 && linebuf[slen - 1] == '\r')
+				slen--;
+			linebuf[slen] = '\0';
+			result[n++] = linebuf;
+			linebegin = &buffer[i + 1];
+		}
+	}
+	result[n] = NULL;
+
+	free(buffer);
+
+	return result;
+}
+
+
+/*
+ * Free memory allocated for optlines through readfile()
+ */
+void
+free_readfile(char **optlines)
+{
+	char	   *curr_line = NULL;
+	int			i = 0;
+
+	if (!optlines)
+		return;
+
+	while ((curr_line = optlines[i++]))
+		free(curr_line);
+
+	free(optlines);
+}
+
 #endif							/* FRONTEND */
 
 /*
diff --git a/src/include/common/file_utils.h b/src/include/common/file_utils.h
index 2811744c12..27736e3dd7 100644
--- a/src/include/common/file_utils.h
+++ b/src/include/common/file_utils.h
@@ -30,6 +30,9 @@ extern void fsync_pgdata(const char *pg_data, int serverVersion);
 extern void fsync_dir_recurse(const char *dir);
 extern int	durable_rename(const char *oldfile, const char *newfile);
 extern int	fsync_parent_path(const char *fname);
+
+extern char **readfile(const char *path, int *numlines);
+extern void free_readfile(char **optlines);
 #endif
 
 extern PGFileType get_dirent_type(const char *path,
-- 
2.30.2

From 81788de158abbe1ffb378ffb0884b943232e51b0 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Fri, 11 Feb 2022 03:17:57 -0300
Subject: [PATCH v1 2/2] Create a new logical replica from a base backup or
 standby server.

A new tool called pg_subscriber can convert a physical replica or a base
backup into a logical replica. It runs on the target server and should
be able to connect to the source server (publisher) and the target
server (subscriber).

The conversion requires eight steps. Check if the target data directory
ha the same system identifier than the source data directory. Stop the
target server if it is running as a standby server. Create one
replication slot per specified database on the source server. One
additional replication slot is created at the end to get the consistent
LSN (This consistent LSN will be used as (a) a stopping point for the
recovery process and (b) a starting point for the subscriptions). Write
recovery parameters into the target data directory and start the target
server (Wait until the target server is promoted). Create one
publication (FOR ALL TABLES) per specified database on the source
server. Create one subscription per specified database on the target
server (Use replication slot and publication created in a previous step.
Don't enable the subscriptions yet). Sets the replication progress to
the consistent LSN that was got in a previous step. Enable the
subscription for each specified database on the target server.

Depending on your workload and database size, creating a logical replica
couldn't be an option due to resource contraints (WAL backlog should be
available until all table data is synchronized). The initial data copy
and the replication progress tends to be faster on a physical replica.
The purpose of this tool is to speed up a logical replica setup.
---
 doc/src/sgml/ref/allfiles.sgml        |    1 +
 doc/src/sgml/ref/pg_subscriber.sgml   |  242 ++++
 doc/src/sgml/reference.sgml           |    1 +
 src/bin/Makefile                      |    1 +
 src/bin/pg_subscriber/Makefile        |   39 +
 src/bin/pg_subscriber/pg_subscriber.c | 1463 +++++++++++++++++++++++++
 src/bin/pg_subscriber/t/001_basic.pl  |   41 +
 src/tools/msvc/Mkvcbuild.pm           |    2 +-
 8 files changed, 1789 insertions(+), 1 deletion(-)
 create mode 100644 doc/src/sgml/ref/pg_subscriber.sgml
 create mode 100644 src/bin/pg_subscriber/Makefile
 create mode 100644 src/bin/pg_subscriber/pg_subscriber.c
 create mode 100644 src/bin/pg_subscriber/t/001_basic.pl

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index d67270ccc3..eab7f2f616 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -212,6 +212,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgResetwal         SYSTEM "pg_resetwal.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
+<!ENTITY pgSubscriber       SYSTEM "pg_subscriber.sgml">
 <!ENTITY pgVerifyBackup     SYSTEM "pg_verifybackup.sgml">
 <!ENTITY pgtestfsync        SYSTEM "pgtestfsync.sgml">
 <!ENTITY pgtesttiming       SYSTEM "pgtesttiming.sgml">
diff --git a/doc/src/sgml/ref/pg_subscriber.sgml b/doc/src/sgml/ref/pg_subscriber.sgml
new file mode 100644
index 0000000000..e68a19092e
--- /dev/null
+++ b/doc/src/sgml/ref/pg_subscriber.sgml
@@ -0,0 +1,242 @@
+<!--
+doc/src/sgml/ref/pg_subscriber.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgsubscriber">
+ <indexterm zone="app-pgsubscriber">
+  <primary>pg_subscriber</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_subscriber</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_subscriber</refname>
+  <refpurpose>create a new logical replica from a base backup of a
+  <productname>PostgreSQL</productname> cluster or a standby
+  server</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_subscriber</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+  <para>
+   <application>pg_subscriber</application> takes the publisher and subscriber
+   connection strings, a base backup directory and a list of database names and
+   it sets up a new logical replica using the physical recovery process. A
+   standby server can also be used.
+  </para>
+
+  <para>
+   The <application>pg_subscriber</application> should be run at the target
+   server. The source server (known as publisher server) should accept logical
+   replication connections from the target server (known as subscriber server).
+   The target server should accept local logical replication connection.
+  </para>
+
+  <para>
+   The transformation proceeds in eight steps. First,
+   <application>pg_subscriber</application> checks if the given target data
+   directory has the same system identifier than the source data directory.
+   Since it uses the recovery process as one of the steps, it starts the target
+   server as a replica from the source server. If the system identifier is not
+   the same, <application>pg_subscriber</application> will terminate with an
+   error.
+  </para>
+
+  <para>
+   Second, <application>pg_subscriber</application> checks if the target data
+   directory is used by a standby server. Stop the standby server if it is
+   running. One of the next steps is to add some recovery parameters that
+   requires a server start. This step avoids an error.
+  </para>
+
+  <para>
+   Next, <application>pg_subscriber</application> creates one replication slot
+   for each specified database on the source server. The replication slot name
+   contains a <literal>pg_subscriber</literal> prefix. These replication slots
+   will be used by the subscriptions in a future step.  Another replication
+   slot is used to get a consistent start location. This consistent LSN will be
+   used (a) as a stopping point in the <xref
+   linkend="guc-recovery-target-lsn"/> parameter and (b) by the subscriptions
+   as a replication starting point. It guarantees that no transaction will be
+   lost.
+  </para>
+
+  <para>
+   Next, write recovery parameters into the target data directory and start the
+   target server. It specifies a LSN (consistent LSN that was obtained in the
+   previous step) of write-ahead log location up to which recovery will
+   proceed. It also specifies <literal>promote</literal> as the action that the
+   server should take once the recovery target is reached. This step finishes
+   once the server ends standby mode and is accepting read-write operations.
+  </para>
+
+  <para>
+   Next, <application>pg_subscriber</application> creates one publication for
+   each specified database on the source server. Each publication replicates
+   changes for all tables in the database. The publication name contains a
+   <literal>pg_subscriber</literal> prefix. These publication will be used by a
+   corresponding subscription in a next step.
+  </para>
+
+  <para>
+   Next, <application>pg_subscriber</application> creates one subscription for
+   each specified database on the target server. Each subscription name
+   contains a <literal>pg_subscriber</literal> prefix. The replication slot
+   name is identical to the subscription name. It also does not copy existing
+   data from the source server. It does not create a replication slot. Instead,
+   it uses the replication slot that was created in a previous step. The
+   subscription is created but it is not enabled yet. The reason is the
+   replication progress must be set to the consistent LSN but replication
+   origin name contains the subscription oid in its name. Hence, the
+   subscription will be enabled in a separate step.
+  </para>
+
+  <para>
+   Next, <application>pg_subscriber</application> sets the replication progress
+   to the consistent LSN that was obtained in a previous step. When the target
+   server started the recovery process, it caught up to the consistent LSN.
+   This is the exact LSN to be used as a initial location for the logical
+   replication.
+  </para>
+
+  <para>
+   Finally, <application>pg_subscriber</application> enables the subscription
+   for each specified database on the target server. The subscription starts
+   streaming from the consistent LSN.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_subscriber</application> accepts the following
+    command-line arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        The target directory that contains a base backup. It can also be a
+        cluster directory from a standby server.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-P  <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--publisher-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the publisher. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-S <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--subscriber-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the subscriber. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+     <varlistentry>
+      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
+      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
+      <listitem>
+       <para>
+        The database name to create the subscription. Multiple databases can be
+        selected by writing multiple <option>-d</option> switches.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode. This will cause
+        <application>pg_subscriber</application> to output progress messages
+        and detailed information about each step.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other options are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_subscriber</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</option></term>
+       <term><option>--help</option></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_subscriber</application> command
+       line arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To create a logical replica for database <literal>bar</literal> from a base
+   backup of the server at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_basebackup -h foo -D /usr/local/pgsql/data</userinput>
+<prompt>$</prompt> <userinput>pg_subscriber -D /usr/local/pgsql/data -P "host=foo" -S "host=localhost" -d bar</userinput>
+</screen>
+  </para>
+
+  <para>
+   To create a logical replica for databases <literal>hr</literal> and
+   <literal>finance</literal> from a standby server at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_subscriber -D /usr/local/pgsql/data -P "host=foo" -S "host=localhost" -d hr -d finance</userinput>
+</screen>
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="app-pgbasebackup"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index da421ff24e..3566c6050c 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -256,6 +256,7 @@
    &pgReceivewal;
    &pgRecvlogical;
    &pgRestore;
+   &pgSubscriber;
    &pgVerifyBackup;
    &psqlRef;
    &reindexdb;
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 7f9dde924e..6c4d3c1ffe 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -25,6 +25,7 @@ SUBDIRS = \
 	pg_dump \
 	pg_resetwal \
 	pg_rewind \
+	pg_subscriber \
 	pg_test_fsync \
 	pg_test_timing \
 	pg_upgrade \
diff --git a/src/bin/pg_subscriber/Makefile b/src/bin/pg_subscriber/Makefile
new file mode 100644
index 0000000000..c48dca6e49
--- /dev/null
+++ b/src/bin/pg_subscriber/Makefile
@@ -0,0 +1,39 @@
+# src/bin/pg_subscriber/Makefile
+
+PGFILEDESC = "pg_subscriber - create a new logical replica from a base backup or a standby server"
+PGAPPICON=win32
+
+subdir = src/bin/pg_subscriber
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
+LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
+
+OBJS = \
+	$(WIN32RES) \
+	pg_subscriber.o
+
+all: pg_subscriber
+
+pg_subscriber: $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+install: all installdirs
+	$(INSTALL_PROGRAM) pg_subscriber$(X) '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
+
+clean distclean maintainer-clean:
+	rm -f pg_subscriber$(X) $(OBJS)
+	rm -rf tmp_check
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
diff --git a/src/bin/pg_subscriber/pg_subscriber.c b/src/bin/pg_subscriber/pg_subscriber.c
new file mode 100644
index 0000000000..7565950a08
--- /dev/null
+++ b/src/bin/pg_subscriber/pg_subscriber.c
@@ -0,0 +1,1463 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_subscriber.c
+ *	  Create a new logical replica from a base backup or a standby server
+ *
+ * Copyright (C) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_subscriber/pg_subscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "catalog/pg_control.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_utils.h"
+#include "common/logging.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+#include "utils/pidfile.h"
+
+typedef struct LogicalRepInfo
+{
+	Oid			oid;			/* database OID */
+	char	   *dbname;			/* database name */
+	char	   *pubconninfo;	/* publication connection string for logical
+								 * replication */
+	char	   *subconninfo;	/* subscription connection string for logical
+								 * replication */
+	char	   *pubname;		/* publication name */
+	char	   *subname;		/* subscription name (also replication slot
+								 * name) */
+
+	bool		made_replslot;		/* replication slot was created */
+	bool		made_publication;	/* publication was created */
+	bool		made_subscription;	/* subscription was created */
+}			LogicalRepInfo;
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(char *conninfo, char *dbname,
+							   const char *noderole);
+static bool check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static PGconn *connect_database(const char *conninfo, bool secure_search_path);
+static void disconnect_database(PGconn *conn);
+static char *get_sysid_from_conn(const char *conninfo);
+static char *get_control_from_datadir(const char *datadir);
+static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+											 const char *slot_name);
+static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action);
+static bool postmaster_is_alive(pid_t pid);
+static void wait_postmaster_connection(const char *conninfo);
+static void wait_for_end_recovery(const char *conninfo);
+static void create_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn);
+static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+
+#define	USEC_PER_SEC	1000000
+#define	WAIT_INTERVAL	1		/* 1 second */
+
+/* Options */
+const char *progname;
+static char *subscriber_dir = NULL;
+static char *pub_conninfo_str = NULL;
+static char *sub_conninfo_str = NULL;
+static SimpleStringList database_names = {NULL, NULL};
+static int	verbose = 0;
+static bool	success = false;
+
+static LogicalRepInfo  *dbinfo;
+
+static int		num_dbs = 0;
+
+static char		temp_replslot[NAMEDATALEN];
+static bool		made_temp_replslot = false;
+
+char		pidfile[MAXPGPATH]; /* subscriber PID file */
+
+enum WaitPMResult
+{
+	POSTMASTER_READY,
+	POSTMASTER_STANDBY,
+	POSTMASTER_STILL_STARTING,
+	POSTMASTER_FAILED
+};
+
+
+/*
+ * Cleanup objects that were created by pg_subscriber if there is an error.
+ *
+ * Replication slots, publications and subscriptions are created. Dependind on
+ * the step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ */
+static void
+cleanup_objects_atexit(void)
+{
+	PGconn	*conn;
+	int		i;
+
+	if (success)
+		return;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		if (dbinfo[i].made_subscription)
+		{
+			conn = connect_database(dbinfo[i].subconninfo, true);
+			if (conn != NULL)
+			{
+				drop_subscription(conn, &dbinfo[i]);
+				disconnect_database(conn);
+			}
+		}
+
+		if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+		{
+			conn = connect_database(dbinfo[i].pubconninfo, true);
+			if (conn != NULL)
+			{
+				if (dbinfo[i].made_publication)
+					drop_publication(conn, &dbinfo[i]);
+				if (dbinfo[i].made_replslot)
+					drop_replication_slot(conn, &dbinfo[i], NULL);
+				disconnect_database(conn);
+			}
+		}
+	}
+
+	if (made_temp_replslot)
+	{
+		conn = connect_database(dbinfo[0].pubconninfo, true);
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		disconnect_database(conn);
+	}
+}
+
+static void
+usage(void)
+{
+	printf(_("%s creates a new logical replica from a base backup or a standby server.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
+	printf(_(" -P, --publisher-conninfo=CONNINFO   publisher connection string\n"));
+	printf(_(" -S, --subscriber-conninfo=CONNINFO  subscriber connection string\n"));
+	printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
+	printf(_(" -v, --verbose                       output verbose messages\n"));
+	printf(_(" -V, --version                       output version information, then exit\n"));
+	printf(_(" -?, --help                          show this help, then exit\n"));
+	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name plus a fallback application name.
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection string.
+ * If the second argument (dbname) is not null, returns dbname if the provided
+ * connection string contains it. If option --database is not provided, uses
+ * dbname as the only database to setup the logical replica.
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(char *conninfo, char *dbname, const char *noderole)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	PQconninfoOption *conn_opts = NULL;
+	PQconninfoOption *conn_opt;
+	char	   *errmsg = NULL;
+	char	   *ret;
+	int			i;
+
+	if (verbose)
+		pg_log_info("validating connection string on %s", noderole);
+
+	conn_opts = PQconninfoParse(conninfo, &errmsg);
+	if (conn_opts == NULL)
+	{
+		pg_log_error("could not parse connection string: %s", errmsg);
+		return NULL;
+	}
+
+	i = 0;
+	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+	{
+		if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+		{
+			if (dbname)
+				dbname = pg_strdup(conn_opt->val);
+			continue;
+		}
+
+		if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+		{
+			if (i > 0)
+				appendPQExpBufferChar(buf, ' ');
+			appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+			i++;
+		}
+	}
+
+	if (i > 0)
+		appendPQExpBufferChar(buf, ' ');
+	appendPQExpBuffer(buf, "fallback_application_name=%s", progname);
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+	PQconninfoFree(conn_opts);
+
+	return ret;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static bool
+check_data_directory(const char *datadir)
+{
+	struct stat statbuf;
+	char		versionfile[MAXPGPATH];
+
+	if (verbose)
+		pg_log_info("checking if directory \"%s\" is a cluster data directory",
+					datadir);
+
+	if (stat(datadir, &statbuf) != 0)
+	{
+		if (errno == ENOENT)
+			pg_log_error("data directory \"%s\" does not exist", datadir);
+		else
+			pg_log_error("could not access directory \"%s\": %s", datadir, strerror(errno));
+
+		return false;
+	}
+
+	snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+	if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+	{
+		pg_log_error("directory \"%s\" is not a database cluster directory", datadir);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	Assert(conninfo != NULL);
+
+	appendPQExpBufferStr(buf, conninfo);
+	appendPQExpBuffer(buf, " dbname=%s", dbname);
+	appendPQExpBufferStr(buf, " replication=database");
+
+	ret = pg_strdup(buf->data);
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+static PGconn *
+connect_database(const char *conninfo, bool secure_search_path)
+{
+	PGconn	   *conn;
+
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_log_error("connection to database failed: %s", PQerrorMessage(conn));
+		return NULL;
+	}
+
+	/* secure search_path */
+	if (secure_search_path)
+	{
+		PGresult   *res;
+
+		res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not clear search_path: %s", PQresultErrorMessage(res));
+			return NULL;
+		}
+		PQclear(res);
+	}
+
+	return conn;
+}
+
+static void
+disconnect_database(PGconn *conn)
+{
+	Assert(conn != NULL);
+
+	PQfinish(conn);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static char *
+get_sysid_from_conn(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	char	   *repconninfo;
+	char	   *sysid = NULL;
+
+	if (verbose)
+		pg_log_info("getting system identifier from publisher");
+
+	repconninfo = psprintf("%s replication=database", conninfo);
+	conn = connect_database(repconninfo, false);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "IDENTIFY_SYSTEM", PQresultErrorMessage(res));
+		PQclear(res);
+		return NULL;
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+
+		PQclear(res);
+		return NULL;
+	}
+
+	sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+	disconnect_database(conn);
+
+	return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a replication connection.
+ */
+static char *
+get_control_from_datadir(const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	char	   *sysid = pg_malloc(32);
+
+	if (verbose)
+		pg_log_info("getting system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	snprintf(sysid, 32, UINT64_FORMAT, cf->system_identifier);
+
+	pfree(cf);
+
+	return sysid;
+}
+
+/*
+ * Create a logical replication slot and returns a consistent LSN. The returned
+ * LSN might be used to catch up the subscriber up to the required point.
+ *
+ * XXX CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the consistent LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+								const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	char	   *lsn = NULL;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+	appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+					 PQresultErrorMessage(res));
+		return lsn;
+	}
+
+	/* for cleanup purposes */
+	if (slot_name == NULL)
+		dbinfo->made_replslot = true;
+	else
+		made_temp_replslot = true;
+
+	lsn = pg_strdup(PQgetvalue(res, 0, 1));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+
+	return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+					 PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc, int action)
+{
+	if (rc != 0)
+	{
+		if (WIFEXITED(rc))
+		{
+			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+		}
+		else if (WIFSIGNALED(rc))
+		{
+#if defined(WIN32)
+			pg_log_error("pg_ctl was terminated by exception 0x%X", WTERMSIG(rc));
+			fprintf(stderr,
+					"See C include file \"ntstatus.h\" for a description of the hexadecimal value.\n");
+#else
+			pg_log_error("pg_ctl was terminated by signal %d: %s",
+						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+		}
+		else
+		{
+			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+		}
+
+		fprintf(stderr, "The failed command was: %s\n", pg_ctl_cmd);
+		exit(1);
+	}
+
+	if (verbose)
+	{
+		if (action)
+			pg_log_info("postmaster was started");
+		else
+			pg_log_info("postmaster was stopped");
+	}
+}
+
+/*
+ * XXX This function was copied from pg_ctl.c.
+ *
+ * We should probably move it to a common place.
+ */
+static bool
+postmaster_is_alive(pid_t pid)
+{
+	/*
+	 * Test to see if the process is still there.  Note that we do not
+	 * consider an EPERM failure to mean that the process is still there;
+	 * EPERM must mean that the given PID belongs to some other userid, and
+	 * considering the permissions on $PGDATA, that means it's not the
+	 * postmaster we are after.
+	 *
+	 * Don't believe that our own PID or parent shell's PID is the postmaster,
+	 * either.  (Windows hasn't got getppid(), though.)
+	 */
+	if (pid == getpid())
+		return false;
+#ifndef WIN32
+	if (pid == getppid())
+		return false;
+#endif
+	if (kill(pid, 0) == 0)
+		return true;
+	return false;
+}
+
+/*
+ * Returns after postmaster is accepting connections.
+ */
+static void
+wait_postmaster_connection(const char *conninfo)
+{
+	PGPing		ret;
+	long		pmpid;
+	int			status = POSTMASTER_STILL_STARTING;
+
+	if (verbose)
+		pg_log_info("waiting for the postmaster to allow connections ...");
+
+	/*
+	 * Wait postmaster to come up. XXX this code path is a modified version of
+	 * wait_for_postmaster().
+	 */
+	for (;;)
+	{
+		char	  **optlines;
+		int			numlines;
+
+		if ((optlines = readfile(pidfile, &numlines)) != NULL &&
+			numlines >= LOCK_FILE_LINE_PM_STATUS)
+		{
+			/*
+			 * Check the status line (this assumes a v10 or later server).
+			 */
+			char	   *pmstatus = optlines[LOCK_FILE_LINE_PM_STATUS - 1];
+
+			pmpid = atol(optlines[LOCK_FILE_LINE_PID - 1]);
+
+			if (strcmp(pmstatus, PM_STATUS_READY) == 0)
+			{
+				free_readfile(optlines);
+				status = POSTMASTER_READY;
+				break;
+			}
+			else if (strcmp(pmstatus, PM_STATUS_STANDBY) == 0)
+			{
+				free_readfile(optlines);
+				status = POSTMASTER_STANDBY;
+				break;
+			}
+		}
+
+		free_readfile(optlines);
+
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+	}
+
+	if (verbose)
+		pg_log_info("postmaster.pid is available");
+
+	if (status == POSTMASTER_STILL_STARTING)
+	{
+		pg_log_error("server did not start in time");
+		exit(1);
+	}
+	else if (status == POSTMASTER_STANDBY)
+	{
+		pg_log_error("server is running but hot standby mode is not enabled");
+		exit(1);
+	}
+	else if (status == POSTMASTER_FAILED)
+	{
+		pg_log_error("could not start server");
+		fprintf(stderr, "Examine the log output.\n");
+		exit(1);
+	}
+
+	if (verbose)
+	{
+		pg_log_info("postmaster is up and running");
+		pg_log_info("waiting until the postmaster accepts connections ...");
+	}
+
+	/* Postmaster is up. Let's wait for it to accept connections. */
+	for (;;)
+	{
+		ret = PQping(conninfo);
+		if (ret == PQPING_OK)
+			break;
+		else if (ret == PQPING_NO_ATTEMPT)
+			break;
+
+		/*
+		 * Postmaster started but for some reason it crashed leaving a
+		 * postmaster.pid.
+		 */
+		if (!postmaster_is_alive((pid_t) pmpid))
+		{
+			pg_log_error("could not start server");
+			fprintf(stderr, "Examine the log output.\n");
+			exit(1);
+		}
+
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+	}
+
+	if (verbose)
+		pg_log_info("postmaster is accepting connections");
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ */
+static void
+wait_for_end_recovery(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	int			status = POSTMASTER_STILL_STARTING;
+
+	if (verbose)
+		pg_log_info("waiting the postmaster to reach the consistent state ...");
+
+	conn = connect_database(conninfo, true);
+	if (conn == NULL)
+		exit(1);
+
+	for (;;)
+	{
+		bool		in_recovery;
+
+		res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain recovery progress");
+			exit(1);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("unexpected result from pg_is_in_recovery function");
+			exit(1);
+		}
+
+		in_recovery = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+
+		PQclear(res);
+
+		/* Does the recovery process finish? */
+		if (!in_recovery)
+		{
+			status = POSTMASTER_READY;
+			break;
+		}
+
+		/* Keep waiting. */
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+	}
+
+	disconnect_database(conn);
+
+	if (status == POSTMASTER_STILL_STARTING)
+	{
+		pg_log_error("server did not end recovery");
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("postmaster reached the consistent state");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/* Check if the publication needs to be created. */
+	appendPQExpBuffer(str,
+					  "SELECT puballtables FROM pg_catalog.pg_publication WHERE pubname = '%s'",
+					  dbinfo->pubname);
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) == 1)
+	{
+		/*
+		 * If publication name already exists and puballtables is true, let's
+		 * use it. A previous run of pg_subscriber must have created this
+		 * publication. Bail out.
+		 */
+		if (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
+		{
+			if (verbose)
+				pg_log_info("publication \"%s\" already exists", dbinfo->pubname);
+			return;
+		}
+		else
+		{
+			/*
+			 * XXX Unfortunately, if it reaches this code path, pg_subscriber
+			 * will always fail here. That's bad but it is not expected that
+			 * the user choose a name with pg_subscriber_ prefix followed by
+			 * the exact database oid in which puballtables is false.
+			 */
+			pg_log_error("publication \"%s\" does not replicate changes for all tables",
+						 dbinfo->pubname);
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	PQclear(res);
+	resetPQExpBuffer(str);
+
+	if (verbose)
+		pg_log_info("creating publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+					 dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_publication = true;
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it. By
+ * default, the subscription name is used as replication slot name. It is
+ * not required to copy data. The subscription will be created but it will not
+ * be enabled now. That's because the replication progress must be set and the
+ * replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("creating subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s "
+					  "WITH (create_slot = false, copy_data = false, enabled = false)",
+					  dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+					 dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_subscription = true;
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove subscription if it couldn't finish all steps.
+ */
+static void
+drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("dropping subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", dbinfo->subname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the temporary
+ * replication slot. The goal is to set up the initial location for the logical
+ * replication that is the exact LSN that the subscriber was promoted. Once the
+ * subscription is enabled it will start streaming from that location onwards.
+ */
+static void
+set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	Oid			suboid;
+	char		originname[NAMEDATALEN];
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(str,
+					  "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", dbinfo->subname);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscription OID: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) != 1)
+	{
+		pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	/*
+	 * The origin name is defined as pg_%u. %u is the subscription OID. See
+	 * ApplyWorkerMain().
+	 */
+	suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+	snprintf(originname, sizeof(originname), "pg_%u", suboid);
+
+	PQclear(res);
+
+	if (verbose)
+		pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+					originname, lsn, dbinfo->dbname);
+
+	resetPQExpBuffer(str);
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", originname, lsn);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+					 dbinfo->subname, PQresultErrorMessage(res));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial location, enabling the subscription is the last step
+ * of this setup.
+ */
+static void
+enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	if (verbose)
+		pg_log_info("enabling subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname);
+
+	if (verbose)
+		pg_log_info("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not enable subscription \"%s\": %s", dbinfo->subname,
+					 PQerrorMessage(conn));
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res);
+	destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] =
+	{
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"pgdata", required_argument, NULL, 'D'},
+		{"publisher-conninfo", required_argument, NULL, 'P'},
+		{"subscriber-conninfo", required_argument, NULL, 'S'},
+		{"database", required_argument, NULL, 'd'},
+		{"verbose", no_argument, NULL, 'v'},
+		{"stop-subscriber", no_argument, NULL, 1},
+		{NULL, 0, NULL, 0}
+	};
+
+	int			c;
+	int			option_index;
+
+	char	   *pg_ctl_path;
+	char	   *pg_ctl_cmd;
+	int			rc;
+
+	SimpleStringListCell *cell;
+
+	char	   *pub_base_conninfo = NULL;
+	char	   *sub_base_conninfo = NULL;
+	char	   *dbname_conninfo;
+
+	char	   *pub_sysid;
+	char	   *sub_sysid;
+	struct stat statbuf;
+
+	PGconn	   *conn;
+	char	   *consistent_lsn;
+
+	PQExpBuffer recoveryconfcontents = NULL;
+
+	int			i;
+
+	pg_logging_init(argv[0]);
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_subscriber"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_subscriber (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	atexit(cleanup_objects_atexit);
+
+	/*
+	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
+	 * it either.
+	 */
+#ifndef WIN32
+	if (geteuid() == 0)
+	{
+		pg_log_error("cannot be executed by \"root\"");
+		fprintf(stderr, _("You must run %s as the PostgreSQL superuser.\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
+	while ((c = getopt_long(argc, argv, "D:P:S:d:t:v",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				subscriber_dir = pg_strdup(optarg);
+				break;
+			case 'P':
+				pub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'S':
+				sub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'd':
+				simple_string_list_append(&database_names, optarg);
+				num_dbs++;
+				break;
+			case 'v':
+				verbose++;
+				break;
+			default:
+
+				/*
+				 * getopt_long already emitted a complaint
+				 */
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (subscriber_dir == NULL)
+	{
+		pg_log_error("no subscriber data directory specified");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Parse connection string. Build a base connection string that might be
+	 * reused by multiple databases.
+	 */
+	if (pub_conninfo_str == NULL)
+	{
+		/*
+		 * FIXME use primary_conninfo (if available) from subscriber and
+		 * extract publisher connection string. Assume that there are
+		 * identical entries for physical and logical replication. If there is
+		 * not, we would fail anyway.
+		 */
+		pg_log_error("no publisher connection string specified");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+	dbname_conninfo = pg_malloc(NAMEDATALEN);
+	pub_base_conninfo = get_base_conninfo(pub_conninfo_str, dbname_conninfo,
+										  "publisher");
+	if (pub_base_conninfo == NULL)
+		exit(1);
+
+	if (sub_conninfo_str == NULL)
+	{
+		pg_log_error("no subscriber connection string specified");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+	sub_base_conninfo = get_base_conninfo(sub_conninfo_str, NULL, "subscriber");
+	if (sub_base_conninfo == NULL)
+		exit(1);
+
+	if (database_names.head == NULL)
+	{
+		if (verbose)
+			pg_log_info("no database was specified");
+
+		/*
+		 * If --database option is not provided, try to obtain the dbname from
+		 * the publisher conninfo. If dbname parameter is not available, error
+		 * out.
+		 */
+		if (dbname_conninfo)
+		{
+			simple_string_list_append(&database_names, dbname_conninfo);
+			num_dbs++;
+
+			if (verbose)
+				pg_log_info("database \"%s\" was extracted from the publisher connection string",
+							dbname_conninfo);
+		}
+		else
+		{
+			pg_log_error("no database name specified");
+			fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+					progname);
+			exit(1);
+		}
+	}
+
+	/*
+	 * Get the absolute pg_ctl path on the subscriber.
+	 */
+	pg_ctl_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(argv[0], "pg_ctl",
+						 "pg_ctl (PostgreSQL) " PG_VERSION "\n",
+						 pg_ctl_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(argv[0], full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_ctl", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_ctl", full_path, progname);
+		exit(1);
+	}
+
+	if (verbose)
+		pg_log_info("pg_ctl path is: %s", pg_ctl_path);
+
+	/* rudimentary check for a data directory. */
+	if (!check_data_directory(subscriber_dir))
+		exit(1);
+
+	/* subscriber PID file. */
+	snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+	/* Store database information for publisher and subscriber. */
+	dbinfo = (LogicalRepInfo *) pg_malloc(num_dbs * sizeof(LogicalRepInfo));
+	i = 0;
+	for (cell = database_names.head; cell; cell = cell->next)
+	{
+		char	   *conninfo;
+
+		/* Publisher. */
+		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+		dbinfo[i].pubconninfo = conninfo;
+		dbinfo[i].dbname = cell->val;
+		dbinfo[i].made_replslot = false;
+		dbinfo[i].made_publication = false;
+		dbinfo[i].made_subscription = false;
+		/* other struct fields will be filled later. */
+
+		/* Subscriber. */
+		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+		dbinfo[i].subconninfo = conninfo;
+
+		i++;
+	}
+
+	/*
+	 * Check if the subscriber data directory has the same system identifier
+	 * than the publisher data directory.
+	 */
+	pub_sysid = pg_malloc(32);
+	pub_sysid = get_sysid_from_conn(dbinfo[0].pubconninfo);
+	sub_sysid = pg_malloc(32);
+	sub_sysid = get_control_from_datadir(subscriber_dir);
+	if (strcmp(pub_sysid, sub_sysid) != 0)
+	{
+		pg_log_error("subscriber data directory is not a base backup from the publisher");
+		exit(1);
+	}
+
+	/*
+	 * Stop the subscriber if it is a standby server. Before executing the
+	 * transformation steps, make sure the subscriber is not running because
+	 * one of the steps is to modify some recovery parameters that require a
+	 * restart.
+	 */
+	if (stat(pidfile, &statbuf) == 0)
+	{
+		if (verbose)
+		{
+			pg_log_info("subscriber is up and running");
+			pg_log_info("stopping the server to start the transformation steps");
+		}
+
+		pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+		rc = system(pg_ctl_cmd);
+		pg_ctl_status(pg_ctl_cmd, rc, 0);
+	}
+
+	/*
+	 * Create a replication slot for each database on the publisher.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		PGresult   *res;
+		char		replslotname[NAMEDATALEN];
+
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		res = PQexec(conn,
+					 "SELECT oid FROM pg_catalog.pg_database WHERE datname = current_database()");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain database OID: %s", PQresultErrorMessage(res));
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+						 PQntuples(res), 1);
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+
+		/* Remember database OID. */
+		dbinfo[i].oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+		PQclear(res);
+
+		/*
+		 * Build the replication slot name. The name must not exceed
+		 * NAMEDATALEN - 1. This current schema uses a maximum of 36
+		 * characters (14 + 10 + 1 + 10 + '\0'). System identifier is included
+		 * to reduce the probability of collision. By default, subscription
+		 * name is used as replication slot name.
+		 */
+		snprintf(replslotname, sizeof(replslotname),
+				 "pg_subscriber_%u_%d",
+				 dbinfo[i].oid,
+				 (int) getpid());
+		dbinfo[i].subname = pg_strdup(replslotname);
+
+		/* Create replication slot on publisher. */
+		if (create_logical_replication_slot(conn, &dbinfo[i], replslotname) != NULL)
+			pg_log_info("create replication slot \"%s\" on publisher", replslotname);
+		else
+			exit(1);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Create a temporary logical replication slot to get a consistent LSN.
+	 *
+	 * This consistent LSN will be used later to advanced the recently created
+	 * replication slots. We could probably use the last created replication
+	 * slot, however, if this tool decides to support cloning the publisher
+	 * (via pg_basebackup -- after creating the replication slots), the
+	 * consistent point should be after the pg_basebackup finishes.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo, false);
+	if (conn == NULL)
+		exit(1);
+	snprintf(temp_replslot, sizeof(temp_replslot), "pg_subscriber_%d_tmp",
+			 (int) getpid());
+	consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0],
+													 temp_replslot);
+
+	/*
+	 * Write recovery parameters.
+	 *
+	 * Despite of the recovery parameters will be written to the subscriber,
+	 * use a publisher connection for the follwing recovery functions. The
+	 * connection is only used to check the current server version (physical
+	 * replica, same server version). The subscriber is not running yet.
+	 */
+	recoveryconfcontents = GenerateRecoveryConfig(conn, NULL);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+					  consistent_lsn);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
+
+	WriteRecoveryConfig(conn, subscriber_dir, recoveryconfcontents);
+	disconnect_database(conn);
+
+	/*
+	 * Start subscriber and wait until accepting connections.
+	 */
+	if (verbose)
+		pg_log_info("starting the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 1);
+	wait_postmaster_connection(dbinfo[0].subconninfo);
+
+	/*
+	 * Waiting the subscriber to be promoted.
+	 */
+	wait_for_end_recovery(dbinfo[0].subconninfo);
+
+	/*
+	 * Create a publication for each database. This step should be executed
+	 * after promoting the subscriber to avoid replicating unnecessary
+	 * objects.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		char		pubname[NAMEDATALEN];
+
+		/* Connect to publisher. */
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		/*
+		 * Build the publication name. The name must not exceed NAMEDATALEN -
+		 * 1. This current schema uses a maximum of 35 characters (14 + 10 +
+		 * '\0').
+		 */
+		snprintf(pubname, sizeof(pubname), "pg_subscriber_%u", dbinfo[i].oid);
+		dbinfo[i].pubname = pg_strdup(pubname);
+
+		create_publication(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Create a subscription for each database.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		/* Connect to subscriber. */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+		if (conn == NULL)
+			exit(1);
+
+		create_subscription(conn, &dbinfo[i]);
+
+		/* Set the replication progress to the correct LSN. */
+		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+		/* Enable subscription. */
+		enable_subscription(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * The temporary replication slot is no longer required. Drop it.
+	 * XXX we might not fail here. Instead, provide a warning so the user
+	 * XXX eventually drops the replication slot later.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo, true);
+	if (conn == NULL)
+		exit(1);
+	drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+	disconnect_database(conn);
+
+	/*
+	 * Stop the subscriber.
+	 */
+	if (verbose)
+		pg_log_info("stopping the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 0);
+
+	success = true;
+
+	return 0;
+}
diff --git a/src/bin/pg_subscriber/t/001_basic.pl b/src/bin/pg_subscriber/t/001_basic.pl
new file mode 100644
index 0000000000..824e7d7906
--- /dev/null
+++ b/src/bin/pg_subscriber/t/001_basic.pl
@@ -0,0 +1,41 @@
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use Cwd;
+use Config;
+use File::Basename qw(basename dirname);
+use File::Path qw(rmtree);
+use Fcntl qw(:seek);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_subscriber');
+program_version_ok('pg_subscriber');
+program_options_handling_ok('pg_subscriber');
+
+my $tempdir = PostgreSQL::Test::Utils::tempdir;
+
+my $node = PostgreSQL::Test::Cluster->new('publisher');
+$node->init(allows_streaming => 'logical');
+$node->start;
+
+$node->command_fails_like(
+	['pg_subscriber'],
+	qr/no subscriber data directory specified/,
+	'target directory must be specified');
+$node->command_fails_like(
+	['pg_subscriber', '-D', $tempdir],
+	qr/no publisher connection string specified/,
+	'publisher connection string must be specified');
+$node->command_fails_like(
+	['pg_subscriber', '-D', $tempdir, '-P', 'dbname=postgres'],
+	qr/no subscriber connection string specified/,
+	'subscriber connection string must be specified');
+$node->command_fails_like(
+	['pg_subscriber', '-D', $tempdir, '-P', 'dbname=postgres', '-S', 'dbname=postgres'],
+	qr/is not a database cluster directory/,
+	'directory must be a real database cluster directory');
+
+done_testing();
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 105f5c72a2..330d312ee6 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -55,7 +55,7 @@ my @contrib_excludes = (
 # Set of variables for frontend modules
 my $frontend_defines = { 'initdb' => 'FRONTEND' };
 my @frontend_uselibpq =
-  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb');
+  ('pg_amcheck', 'pg_ctl', 'pg_upgrade', 'pgbench', 'psql', 'initdb', 'pg_subscriber');
 my @frontend_uselibpgport = (
 	'pg_amcheck',    'pg_archivecleanup',
 	'pg_test_fsync', 'pg_test_timing',
-- 
2.30.2

Reply via email to