From fea696fb4bbac7dd42c0867b3b020e2317cb4db1 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 28 Feb 2025 14:44:23 +0530
Subject: [PATCH v13] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--cleanup-existing-publications' option in the
'pg_createsubscriber' utility.

This feature ensures a clean and streamlined setup of logical replication by
removing publications on the subscriber that were originally replicated from
the primary server during streaming replication.
These publications become redundant once the setup transitions to logical
replication and serve no further purpose.

This cleanup process removes all publications from the subscriber, regardless
of their origin. Users should back up any manually created publications before
running this command.

The --cleanup-existing-publications option allows users to remove replicated
publications from the subscriber when setting up logical replication.
This is particularly useful when publications created in a previous setup are
no longer needed.
By default, publications are preserved to avoid unintended data loss.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  11 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   |  89 ++++++---
 .../t/040_pg_createsubscriber.pl              | 187 +++++++++++++-----
 3 files changed, 210 insertions(+), 77 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index b4b996236e4..5878eef0ea0 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,17 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-c</option></term>
+     <term><option>--cleanup-existing-publications</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications from specified databases on the target
+       server.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index a5a2d61165d..3b498d5047e 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -44,6 +44,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		drop_publications;	/* drop all publications */
 };
 
 /* per-database publication/subscription info */
@@ -91,7 +92,8 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn,
+							 bool cleanup_existing_publications);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -109,7 +111,10 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication_by_name(PGconn *conn, const char *pubname,
+									 const char *dbname);
+static void check_and_drop_existing_publications(PGconn *conn,
+												 const char *dbname);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -192,7 +197,8 @@ cleanup_objects_atexit(void)
 			if (conn != NULL)
 			{
 				if (dbinfos.dbinfo[i].made_publication)
-					drop_publication(conn, &dbinfos.dbinfo[i]);
+					drop_publication_by_name(conn, dbinfos.dbinfo[i].pubname,
+											 dbinfos.dbinfo[i].dbname);
 				if (dbinfos.dbinfo[i].made_replslot)
 					drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
 				disconnect_database(conn, false);
@@ -234,6 +240,8 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -c  --cleanup-existing-publications\n"
+			 "                                  drop all publications from specified subscriber databases\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1171,10 +1179,13 @@ check_and_drop_existing_subscriptions(PGconn *conn,
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
- * replication setup.
+ * replication setup. If 'drop_publications' options is true, existing
+ * publications on the subscriber will be dropped before creating new
+ * subscriptions.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool cleanup_existing_publications)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1192,11 +1203,16 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
 
 		/*
-		 * Since the publication was created before the consistent LSN, it is
-		 * available on the subscriber when the physical replica is promoted.
-		 * Remove publications from the subscriber because it has no use.
+		 * Since the publication was created before the consistent LSN, it
+		 * remains on the subscriber even after the physical replica is
+		 * promoted. Remove this publication from the subscriber because it
+		 * has no use. Additionally, if requested, drop all pre-existing
+		 * publications.
 		 */
-		drop_publication(conn, &dbinfo[i]);
+		if (cleanup_existing_publications)
+			check_and_drop_existing_publications(conn, dbinfo[i].dbname);
+		else
+			drop_publication_by_name(conn, dbinfo[i].pubname, dbinfo[i].dbname);
 
 		create_subscription(conn, &dbinfo[i]);
 
@@ -1661,25 +1677,46 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Check and drop existing publications on the subscriber if requested.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+check_and_drop_existing_publications(PGconn *conn, const char *dbname)
 {
-	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
-	char	   *pubname_esc;
 
-	Assert(conn != NULL);
+	pg_log_info("dropping all existing publications in database \"%s\"",
+				dbname);
 
-	pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+	/* Fetch all publication names */
+	res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		return;
+	}
 
-	pg_log_info("dropping publication \"%s\" in database \"%s\"",
-				dbinfo->pubname, dbinfo->dbname);
+	/* Drop each publication */
+	for (int i = 0; i < PQntuples(res); i++)
+		drop_publication_by_name(conn, PQgetvalue(res, i, 0), dbname);
 
-	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
+	PQclear(res);
+	pg_log_info("dropped all publications in database \"%s\"", dbname);
+}
 
-	PQfreemem(pubname_esc);
+/* Drop the specified publication of the given database. */
+static void
+drop_publication_by_name(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	char	   *pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
+
+	pg_log_info("dropping publication \"%s\" in database \"%s\"", pubname, dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
 	pg_log_debug("command is: %s", str->data);
 
@@ -1689,8 +1726,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
 			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-			dbinfo->made_publication = false;	/* don't try again. */
+						 pubname, dbname, PQresultErrorMessage(res));
+			dbinfos.dbinfo->made_publication = false;	/* don't try again. */
 
 			/*
 			 * Don't disconnect and exit here. This routine is used by primary
@@ -1703,6 +1740,7 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		PQclear(res);
 	}
 
+	PQfreemem(pubname_esc);
 	destroyPQExpBuffer(str);
 }
 
@@ -1907,6 +1945,7 @@ main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"cleanup-existing-publications", no_argument, NULL, 'c'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1976,6 +2015,7 @@ main(int argc, char **argv)
 		0
 	};
 	opt.recovery_timeout = 0;
+	opt.drop_publications = false;
 
 	/*
 	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
@@ -1993,11 +2033,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "cd:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'c':
+				opt.drop_publications = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2278,7 +2321,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfos.dbinfo, consistent_lsn);
+	setup_subscriber(dbinfos.dbinfo, consistent_lsn, opt.drop_publications);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c35fa108ce3..d2e46bee9c4 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -152,16 +152,16 @@ $node_p->safe_psql($db2,
 
 # Set up node S as standby linking to node P
 $node_p->backup('backup_1');
-my $node_s = PostgreSQL::Test::Cluster->new('node_s');
-$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
-$node_s->append_conf(
+my $node_s1 = PostgreSQL::Test::Cluster->new('node_s1');
+$node_s1->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s1->append_conf(
 	'postgresql.conf', qq[
 primary_slot_name = '$slotname'
 primary_conninfo = '$pconnstr dbname=postgres'
 hot_standby_feedback = on
 ]);
-$node_s->set_standby_mode();
-$node_s->start;
+$node_s1->set_standby_mode();
+$node_s1->start;
 
 # Set up node T as standby linking to node P then promote it
 my $node_t = PostgreSQL::Test::Cluster->new('node_t');
@@ -192,10 +192,10 @@ command_fails(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 	],
@@ -216,9 +216,9 @@ command_fails(
 	'subscriber data directory is not a copy of the source database cluster');
 
 # Set up node C as standby linking to node S
-$node_s->backup('backup_2');
+$node_s1->backup('backup_2');
 my $node_c = PostgreSQL::Test::Cluster->new('node_c');
-$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1);
+$node_c->init_from_backup($node_s1, 'backup_2', has_streaming => 1);
 $node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef);
 $node_c->set_standby_mode();
 
@@ -229,7 +229,7 @@ command_fails(
 		'--verbose',
 		'--dry-run',
 		'--pgdata' => $node_c->data_dir,
-		'--publisher-server' => $node_s->connstr($db1),
+		'--publisher-server' => $node_s1->connstr($db1),
 		'--socketdir' => $node_c->host,
 		'--subscriber-port' => $node_c->port,
 		'--database' => $db1,
@@ -246,16 +246,16 @@ max_wal_senders = 1
 max_worker_processes = 2
 });
 $node_p->restart;
-$node_s->stop;
+$node_s1->stop;
 command_fails(
 	[
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 
@@ -272,7 +272,7 @@ max_worker_processes = 8
 });
 
 # Check some unmet conditions on node S
-$node_s->append_conf(
+$node_s1->append_conf(
 	'postgresql.conf', q{
 max_replication_slots = 1
 max_logical_replication_workers = 1
@@ -283,15 +283,15 @@ command_fails(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 	],
 	'standby contains unmet conditions on node S');
-$node_s->append_conf(
+$node_s1->append_conf(
 	'postgresql.conf', q{
 max_replication_slots = 10
 max_logical_replication_workers = 4
@@ -305,12 +305,12 @@ my $fslotname = 'failover_slot';
 $node_p->safe_psql($db1,
 	"SELECT pg_create_logical_replication_slot('$fslotname', 'pgoutput', false, false, true)"
 );
-$node_s->start;
+$node_s1->start;
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the failover slot.
-$node_p->wait_for_replay_catchup($node_s);
-$node_s->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
-my $result = $node_s->safe_psql('postgres',
+$node_p->wait_for_replay_catchup($node_s1);
+$node_s1->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
+my $result = $node_s1->safe_psql('postgres',
 	"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
 );
 is($result, 'failover_slot', 'failover slot is synced');
@@ -321,15 +321,30 @@ is($result, 'failover_slot', 'failover slot is synced');
 # slot) xmin on standby could be ahead of the remote slot leading
 # to failure in synchronization.
 $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('second row')");
-$node_p->wait_for_replay_catchup($node_s);
+$node_p->wait_for_replay_catchup($node_s1);
 
 # Create subscription to test its removal
 my $dummy_sub = 'regress_sub_dummy';
 $node_p->safe_psql($db1,
 	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
-$node_p->wait_for_replay_catchup($node_s);
-$node_s->stop;
+$node_p->wait_for_replay_catchup($node_s1);
+
+# Create user-defined publications, wait for streaming replication to sync them
+# to the standby, then verify that '--cleanup-existing-publications'
+# removes them.
+$node_p->safe_psql(
+	$db1, qq(
+	CREATE PUBLICATION test_pub1 FOR ALL TABLES;
+	CREATE PUBLICATION test_pub2 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s1);
+
+ok( $node_s1->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications created before --cleanup-existing-publications is run');
+
+$node_s1->stop;
 
 # dry run mode on node S
 command_ok(
@@ -338,10 +353,10 @@ command_ok(
 		'--verbose',
 		'--dry-run',
 		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--publication' => 'pub1',
 		'--publication' => 'pub2',
 		'--subscription' => 'sub1',
@@ -352,10 +367,11 @@ command_ok(
 	'run pg_createsubscriber --dry-run on node S');
 
 # Check if node S is still a standby
-$node_s->start;
-is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
-	't', 'standby is in recovery');
-$node_s->stop;
+$node_s1->start;
+is( $node_s1->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
+	't',
+	'standby is in recovery');
+$node_s1->stop;
 
 # pg_createsubscriber can run without --databases option
 command_ok(
@@ -363,33 +379,34 @@ command_ok(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--replication-slot' => 'replslot1',
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
-# to show more information.
+# Run pg_createsubscriber on node S using '--cleanup-existing-publications'.
+# --verbose is used twice to show more information.
 # In passing, also test the --enable-two-phase option
 command_ok(
 	[
 		'pg_createsubscriber',
 		'--verbose', '--verbose',
 		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--publication' => 'pub1',
 		'--publication' => 'pub2',
 		'--replication-slot' => 'replslot1',
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
-		'--enable-two-phase'
+		'--enable-two-phase',
+		'--cleanup-existing-publications',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,11 +423,16 @@ $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
 $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 
 # Start subscriber
-$node_s->start;
+$node_s1->start;
+
+# Confirm publications are removed from the subscriber node
+is($node_s1->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+	'0',
+	'all publications dropped after --cleanup-existing-publications is run');
 
 # Verify that all subtwophase states are pending or enabled,
 # e.g. there are no subscriptions where subtwophase is disabled ('d')
-is( $node_s->safe_psql(
+is( $node_s1->safe_psql(
 		'postgres',
 		"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
 	),
@@ -418,50 +440,107 @@ is( $node_s->safe_psql(
 	'subscriptions are created with the two-phase option enabled');
 
 # Confirm the pre-existing subscription has been removed
-$result = $node_s->safe_psql(
+$result = $node_s1->safe_psql(
 	'postgres', qq(
 	SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
 ));
 is($result, qq(0), 'pre-existing subscription was dropped');
 
 # Get subscription names
-$result = $node_s->safe_psql(
+$result = $node_s1->safe_psql(
 	'postgres', qq(
 	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
 ));
 my @subnames = split("\n", $result);
 
 # Wait subscriber to catch up
-$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
-$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+$node_s1->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s1->wait_for_subscription_sync($node_p, $subnames[1]);
 
 # Confirm the failover slot has been removed
-$result = $node_s->safe_psql($db1,
+$result = $node_s1->safe_psql($db1,
 	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$fslotname'"
 );
 is($result, qq(0), 'failover slot was removed');
 
 # Check result in database $db1
-$result = $node_s->safe_psql($db1, 'SELECT * FROM tbl1');
+$result = $node_s1->safe_psql($db1, 'SELECT * FROM tbl1');
 is( $result, qq(first row
 second row
 third row),
 	"logical replication works in database $db1");
 
 # Check result in database $db2
-$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
+$result = $node_s1->safe_psql($db2, 'SELECT * FROM tbl2');
 is($result, qq(row 1), "logical replication works in database $db2");
 
 # Different system identifier?
 my $sysid_p = $node_p->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
-my $sysid_s = $node_s->safe_psql('postgres',
+my $sysid_s = $node_s1->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Reuse P as primary
+# Set up node S2 as standby linking to node P
+$node_p->backup('backup_3');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_3', has_streaming => 1);
+$node_s2->append_conf(
+	'postgresql.conf', qq[
+       primary_conninfo = '$pconnstr'
+       hot_standby_feedback = on
+       max_logical_replication_workers = 5
+       ]);
+$node_s2->set_standby_mode();
+$node_s2->start;
+
+# Ensure there is a user database on the publisher
+my $db3 = generate_db($node_p, 'regression', 91, 127, '');
+
+# Create user-defined publications
+$node_p->safe_psql($db3, "CREATE PUBLICATION test_pub3 FOR ALL TABLES;");
+$node_p->safe_psql($db3, "CREATE PUBLICATION test_pub4 FOR ALL TABLES;");
+
+$node_p->wait_for_replay_catchup($node_s2);
+
+# Verify the existing publications
+ok( $node_s2->safe_psql($db3, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications are created before running pg_createsubscriber');
+
+$node_s2->stop;
+
+# Run pg_createsubscriber on node A without using
+# '--cleanup-existing-publications'.
+# --verbose is used twice to show more information.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db3),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db3,
+	],
+	'run pg_createsubscriber without --cleanup-existing-publications on node A'
+);
+
+$node_s2->start;
+
+# Confirm publications still remain after running 'pg_createsubscriber'
+is($node_s2->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;"),
+	'2', 'all publications remain after running pg_createsubscriber');
+
+# Drop the newly created publications
+$node_p->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub3;");
+$node_p->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub4;");
+
 # clean up
 $node_p->teardown_node;
-$node_s->teardown_node;
+$node_s1->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

