From 8525a8f0fd3ed6a52a7fe6ad62567aeb8c8028a7 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 28 Aug 2025 22:26:04 +0530
Subject: [PATCH v6] Support existing publications in pg_createsubscriber

Add a new '--reuse-existing-publications' option to pg_createsubscriber,
allowing users to reuse existing publications instead of failing when they
already exist on the publisher.

Without this option, pg_createsubscriber fails if any specified publication
already exists. With this option, existing publications are reused and
non-existing publications are created automatically with FOR ALL TABLES.

This is useful in environments where publications may have been created by
previous operations or other processes, eliminating the need to know in advance
which publications exist on the publisher.

When publications are reused, they are never dropped during cleanup operations,
ensuring pre-existing publications remain available for other uses.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 37 ++++++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 90 ++++++++++++++++---
 .../t/040_pg_createsubscriber.pl              | 80 +++++++++++++++++
 3 files changed, 197 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..18716d66a61 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -258,6 +258,43 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>--reuse-existing-publications</option></term>
+     <listitem>
+      <para>
+       When this option is specified,
+       <application>pg_createsubscriber</application> will check if each
+       publication specified with <option>--publication</option>already exists
+       on the publisher. If a publication exists, it will be reused without
+       modification. If a publication does not exist, it will be created
+       automatically with <literal>FOR ALL TABLES</literal>.
+      </para>
+      <para>
+       This option requires you to understand exactly which publications exist
+       and how they are configured. If you reuse an existing publication,
+       it will be used as-is with its current table list, filters, and settings.
+       If you specify a publication that doesn't exist, it will be created with
+       <literal>FOR ALL TABLES</literal>, which may replicate more tables than
+       intended.
+      </para>
+      <para>
+       It is strongly recommended to use <option>--dry-run</option> first to
+       verify exactly which publications will be reused and which will be
+       created. The dry-run output will show you whether each specified
+       publication exists on the publisher and what action will be taken.
+       This verification step can prevent unexpected replication behavior and
+       data transfer.
+      </para>
+      <para>
+       When <option>--reuse-existing-publications</option> is used,
+       existing publications will not be dropped during cleanup operations,
+       ensuring they remain available for other uses. Only publications that
+       were created by <application>pg_createsubscriber</application>
+       will be cleaned up.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>--config-file=<replaceable class="parameter">filename</replaceable></option></term>
      <listitem>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..07aeac43641 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -42,6 +42,8 @@ struct CreateSubscriberOptions
 	bool		two_phase;		/* enable-two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
+	bool		reuse_existing_pubs;	/* --reuse-existing-publications
+										 * option */
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
@@ -93,7 +95,7 @@ static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
 static bool server_is_in_recovery(PGconn *conn);
 static char *generate_object_name(PGconn *conn);
 static void check_publisher(const struct LogicalRepInfo *dbinfo);
-static char *setup_publisher(struct LogicalRepInfo *dbinfo);
+static char *setup_publisher(struct LogicalRepInfo *dbinfo, const struct CreateSubscriberOptions *opt);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
 							 const char *consistent_lsn);
@@ -114,6 +116,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -263,6 +266,7 @@ usage(void)
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
 			 "                                  file when running target cluster\n"));
 	printf(_("      --publication=NAME          publication name\n"));
+	printf(_("      --reuse-existing-publications             reuse existing publications if they exist\n"));
 	printf(_("      --replication-slot=NAME     replication slot name\n"));
 	printf(_("      --subscription=NAME         subscription name\n"));
 	printf(_("  -V, --version                   output version information, then exit\n"));
@@ -753,6 +757,32 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Check if a publication with the given name exists in the specified database.
+ * Returns true if it exists, false otherwise.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	exists = (PQntuples(res) == 1);
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -760,7 +790,7 @@ generate_object_name(PGconn *conn)
  * set_replication_progress).
  */
 static char *
-setup_publisher(struct LogicalRepInfo *dbinfo)
+setup_publisher(struct LogicalRepInfo *dbinfo, const struct CreateSubscriberOptions *opt)
 {
 	char	   *lsn = NULL;
 
@@ -770,6 +800,7 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 	{
 		PGconn	   *conn;
 		char	   *genname = NULL;
+		bool		make_pub = true;
 
 		conn = connect_database(dbinfo[i].pubconninfo, true);
 
@@ -780,22 +811,42 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		 * no replication slot is specified. It follows the same rule as
 		 * CREATE SUBSCRIPTION.
 		 */
-		if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+		if (dbinfo[i].pubname == NULL || dbinfo[i].subname == NULL || dbinfo[i].replslotname == NULL)
 			genname = generate_object_name(conn);
-		if (num_pubs == 0)
+		if (dbinfo[i].pubname == NULL)
 			dbinfo[i].pubname = pg_strdup(genname);
-		if (num_subs == 0)
+		if (dbinfo[i].subname == NULL)
 			dbinfo[i].subname = pg_strdup(genname);
-		if (num_replslots == 0)
+		if (dbinfo[i].replslotname == NULL)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
+		/*
+		 * Check if publication already exists when
+		 * --reuse-existing-publications is specified
+		 */
+		if (opt->reuse_existing_pubs && check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+		{
+			pg_log_info("using existing publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			make_pub = false;
+		}
+
 		/*
 		 * Create publication on publisher. This step should be executed
 		 * *before* promoting the subscriber to avoid any transactions between
 		 * consistent LSN and the new publication rows (such transactions
 		 * wouldn't see the new publication rows resulting in an error).
 		 */
-		create_publication(conn, &dbinfo[i]);
+		if (make_pub)
+		{
+			create_publication(conn, &dbinfo[i]);
+			dbinfo[i].made_publication = true;
+			if (opt->reuse_existing_pubs)
+				pg_log_info("created publication \"%s\" in database \"%s\"",
+							dbinfo[i].pubname, dbinfo[i].dbname);
+		}
+		else
+			dbinfo[i].made_publication = false;
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1771,8 +1822,14 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	 * those to provide necessary information to the user.
 	 */
 	if (!drop_all_pubs || dry_run)
-		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
-						 &dbinfo->made_publication);
+	{
+		if (!dbinfo->made_publication)
+			drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+							 &dbinfo->made_publication);
+	}
+	else
+		pg_log_info("not dropping existing publication \"%s\" in database \"%s\"",
+					dbinfo->pubname, dbinfo->dbname);
 }
 
 /*
@@ -2047,6 +2104,7 @@ main(int argc, char **argv)
 		{"replication-slot", required_argument, NULL, 3},
 		{"subscription", required_argument, NULL, 4},
 		{"clean", required_argument, NULL, 5},
+		{"reuse-existing-publications", no_argument, NULL, 6},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2095,6 +2153,7 @@ main(int argc, char **argv)
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
 	opt.two_phase = false;
+	opt.reuse_existing_pubs = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -2200,6 +2259,9 @@ main(int argc, char **argv)
 				else
 					pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
 				break;
+			case 6:
+				opt.reuse_existing_pubs = true;
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -2220,6 +2282,8 @@ main(int argc, char **argv)
 			bad_switch = "--replication-slot";
 		else if (num_subs > 0)
 			bad_switch = "--subscription";
+		else if (opt.reuse_existing_pubs)
+			bad_switch = "--reuse-existing-publications";
 
 		if (bad_switch)
 		{
@@ -2319,6 +2383,12 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (opt.reuse_existing_pubs && num_pubs == 0)
+	{
+		pg_log_error("--reuse-existing-publications requires --publication to be specified");
+		exit(1);
+	}
+
 	/* Number of object names must match number of databases */
 	if (num_pubs > 0 && num_pubs != num_dbs)
 	{
@@ -2424,7 +2494,7 @@ main(int argc, char **argv)
 	stop_standby_server(subscriber_dir);
 
 	/* Create the required objects for each database on publisher */
-	consistent_lsn = setup_publisher(dbinfos.dbinfo);
+	consistent_lsn = setup_publisher(dbinfos.dbinfo, &opt);
 
 	/* Write the required recovery parameters */
 	setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..473ae7eac70 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,89 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1,
+	"CREATE PUBLICATION test_pub_existing FOR TABLE tbl1");
+
+# Initialize node_s2 as a fresh standby of node_p for existing/new
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber on node S2 with --reuse-existing-publications option for mixed
+# scenario (one existing publication, one new publication)
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+		'--reuse-existing-publications',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Verify that test_pub_new was created in db2
+$result = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($result, '1', 'test_pub_new publication was created in db2');
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub_existing}
+{test_pub_new}),
+	"subscriptions use the correct publications with --reuse-existing-publications"
+);
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.41.0.windows.3

