From 4c5fe9ea67329b374407a6c61b8154a42462a623 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 28 Aug 2025 22:26:04 +0530
Subject: [PATCH v4] Support existing publications in pg_createsubscriber

Add the --existing-publication option to pg_createsubscriber, allowing users to
specify existing publications instead of creating new ones.
This provides more flexibility when setting up logical replication,
particularly in scenarios where publications are managed separately or need to
be reused across multiple subscribers.

Key features:
1. New --existing-publication option accepts existing publication names.
2. Prevents mixing --publication and --existing-publication options.
3. Supports per-database specification like other options.
4. Includes validation that specified publications exist.

The number of existing publication names must match the number of database
names, following the same pattern as other pg_createsubscriber options.
This design aligns with the principle of other PostgreSQL tools that allow both
creation of new objects and reuse of existing ones.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  38 ++++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 117 ++++++++++++++++--
 .../t/040_pg_createsubscriber.pl              |  90 ++++++++++++++
 3 files changed, 233 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..d1c92910a74 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -258,6 +258,44 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>--existing-publication=<replaceable class="parameter">publication_name</replaceable></option></term>
+     <listitem>
+      <para>
+       Specifies the name of an existing publication on the publisher to use
+       for logical replication. This option can be specified multiple times
+       to provide publication names for multiple databases when using multiple
+       <option>--database</option> options.
+      </para>
+      <para>
+       The specified publication must already exist on the publisher database.
+       When this option is used, <application>pg_createsubscriber</application>
+       will not create a new publication but will use the existing one.
+       During cleanup operations (including when using <option>--clean</option>),
+       existing publications specified with this option will not be dropped,
+       ensuring they remain available for other uses.
+      </para>
+      <para>
+       This option cannot be used together with <option>--publication</option>.
+       Use either <option>--publication</option> to create new publications
+       or <option>--existing-publication</option> to use existing ones, but
+       not both.
+      </para>
+      <para>
+       If neither <option>--publication</option> nor
+       <option>--existing-publication</option> is specified,
+       <application>pg_createsubscriber</application> will automatically
+       generate publication names and create new publications.
+      </para>
+      <para>
+       The number of existing publication names specified must equal the number
+       of database names when multiple databases are being configured. Each
+       existing publication will be used for its corresponding database in
+       the order specified.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>--config-file=<replaceable class="parameter">filename</replaceable></option></term>
      <listitem>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..3096b1f96f8 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -42,6 +42,8 @@ struct CreateSubscriberOptions
 	bool		two_phase;		/* enable-two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
+	SimpleStringList existing_pub_names;	/* list of existing publication
+											 * names */
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
@@ -61,6 +63,8 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	bool		is_existing_publication;	/* true if --existing-publication
+											 * was used */
 };
 
 /*
@@ -114,6 +118,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -161,7 +166,6 @@ enum WaitPMResult
 	POSTMASTER_STILL_STARTING
 };
 
-
 /*
  * Cleanup objects that were created by pg_createsubscriber if there is an
  * error.
@@ -202,7 +206,7 @@ cleanup_objects_atexit(void)
 			conn = connect_database(dbinfo->pubconninfo, false);
 			if (conn != NULL)
 			{
-				if (dbinfo->made_publication)
+				if (dbinfo->made_publication && !dbinfo->is_existing_publication)
 					drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
 									 &dbinfo->made_publication);
 				if (dbinfo->made_replslot)
@@ -216,7 +220,7 @@ cleanup_objects_atexit(void)
 				 * that some objects were left on primary and should be
 				 * removed before trying again.
 				 */
-				if (dbinfo->made_publication)
+				if (dbinfo->made_publication && !dbinfo->is_existing_publication)
 				{
 					pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
 								   dbinfo->pubname,
@@ -263,6 +267,7 @@ usage(void)
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
 			 "                                  file when running target cluster\n"));
 	printf(_("      --publication=NAME          publication name\n"));
+	printf(_("      --existing-publication=NAME use an existing publication name\n"));
 	printf(_("      --replication-slot=NAME     replication slot name\n"));
 	printf(_("      --subscription=NAME         subscription name\n"));
 	printf(_("  -V, --version                   output version information, then exit\n"));
@@ -466,14 +471,17 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 {
 	struct LogicalRepInfo *dbinfo;
 	SimpleStringListCell *pubcell = NULL;
+	SimpleStringListCell *existing_pub_cell = NULL;
 	SimpleStringListCell *subcell = NULL;
 	SimpleStringListCell *replslotcell = NULL;
 	int			i = 0;
 
 	dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
 
-	if (num_pubs > 0)
+	if (opt->pub_names.head > 0)
 		pubcell = opt->pub_names.head;
+	if (opt->existing_pub_names.head != NULL)
+		existing_pub_cell = opt->existing_pub_names.head;
 	if (num_subs > 0)
 		subcell = opt->sub_names.head;
 	if (num_replslots > 0)
@@ -487,10 +495,21 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
 		dbinfo[i].pubconninfo = conninfo;
 		dbinfo[i].dbname = cell->val;
-		if (num_pubs > 0)
+		if (pubcell != NULL)
+		{
 			dbinfo[i].pubname = pubcell->val;
+			dbinfo[i].is_existing_publication = false;
+		}
+		else if (existing_pub_cell != NULL)
+		{
+			dbinfo[i].pubname = existing_pub_cell->val;
+			dbinfo[i].is_existing_publication = true;
+		}
 		else
+		{
 			dbinfo[i].pubname = NULL;
+			dbinfo[i].is_existing_publication = false;
+		}
 		if (num_replslots > 0)
 			dbinfo[i].replslotname = replslotcell->val;
 		else
@@ -506,8 +525,9 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 			dbinfo[i].subname = NULL;
 		/* Other fields will be filled later */
 
-		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+		pg_log_debug("publisher(%d): publication: %s (%s); replication slot: %s ; connection string: %s", i,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+					 dbinfo[i].is_existing_publication ? "existing" : "new",
 					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
 					 dbinfo[i].pubconninfo);
 		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
@@ -515,8 +535,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 					 dbinfo[i].subconninfo,
 					 dbinfos.two_phase ? "true" : "false");
 
-		if (num_pubs > 0)
+		if (pubcell != NULL)
 			pubcell = pubcell->next;
+		if (existing_pub_cell != NULL)
+			existing_pub_cell = existing_pub_cell->next;
 		if (num_subs > 0)
 			subcell = subcell->next;
 		if (num_replslots > 0)
@@ -753,6 +775,31 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Add function to check if publication exists.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists = false;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
+		exists = true;
+	else if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -780,22 +827,34 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		 * no replication slot is specified. It follows the same rule as
 		 * CREATE SUBSCRIPTION.
 		 */
-		if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+		if (dbinfo[i].pubname == NULL || dbinfo[i].subname == NULL || dbinfo[i].replslotname == NULL)
 			genname = generate_object_name(conn);
-		if (num_pubs == 0)
+		if (dbinfo[i].pubname == NULL)
 			dbinfo[i].pubname = pg_strdup(genname);
 		if (num_subs == 0)
 			dbinfo[i].subname = pg_strdup(genname);
 		if (num_replslots == 0)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
+		/* If using an existing publication, verify it exists. */
+		if (dbinfo[i].is_existing_publication)
+		{
+			if (!check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+				pg_fatal("publication \"%s\" does not exist in database \"%s\"",
+						 dbinfo[i].pubname, dbinfo[i].dbname);
+		}
+
 		/*
 		 * Create publication on publisher. This step should be executed
 		 * *before* promoting the subscriber to avoid any transactions between
 		 * consistent LSN and the new publication rows (such transactions
 		 * wouldn't see the new publication rows resulting in an error).
 		 */
-		create_publication(conn, &dbinfo[i]);
+		if (!dbinfo[i].is_existing_publication)
+			create_publication(conn, &dbinfo[i]);
+		else
+			pg_log_info("using existing publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1770,9 +1829,12 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	 * In dry-run mode, we don't create publications, but we still try to drop
 	 * those to provide necessary information to the user.
 	 */
-	if (!drop_all_pubs || dry_run)
+	else if (!dbinfo->is_existing_publication)
 		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
 						 &dbinfo->made_publication);
+	else
+		pg_log_info("not dropping existing publication \"%s\" in database \"%s\"",
+					dbinfo->pubname, dbinfo->dbname);
 }
 
 /*
@@ -2047,6 +2109,7 @@ main(int argc, char **argv)
 		{"replication-slot", required_argument, NULL, 3},
 		{"subscription", required_argument, NULL, 4},
 		{"clean", required_argument, NULL, 5},
+		{"existing-publication", required_argument, NULL, 6},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2058,6 +2121,7 @@ main(int argc, char **argv)
 	char	   *pub_base_conninfo;
 	char	   *sub_base_conninfo;
 	char	   *dbname_conninfo = NULL;
+	int			total_pubs_configured = 0;
 
 	uint64		pub_sysid;
 	uint64		sub_sysid;
@@ -2200,6 +2264,15 @@ main(int argc, char **argv)
 				else
 					pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
 				break;
+			case 6:
+				if (!simple_string_list_member(&opt.existing_pub_names, optarg))
+				{
+					simple_string_list_append(&opt.existing_pub_names, optarg);
+					num_pubs++;
+				}
+				else
+					pg_fatal("existing publication \"%s\" specified more than once for --existing-publication", optarg);
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -2214,12 +2287,14 @@ main(int argc, char **argv)
 
 		if (num_dbs > 0)
 			bad_switch = "--database";
-		else if (num_pubs > 0)
+		else if (opt.pub_names.head > 0)
 			bad_switch = "--publication";
 		else if (num_replslots > 0)
 			bad_switch = "--replication-slot";
 		else if (num_subs > 0)
 			bad_switch = "--subscription";
+		else if (opt.existing_pub_names.head != NULL)
+			bad_switch = "--existing-publication";
 
 		if (bad_switch)
 		{
@@ -2319,6 +2394,13 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (opt.pub_names.head > 0 && opt.existing_pub_names.head != NULL)
+	{
+		pg_log_error("options --publication and --existing-publication cannot be used together");
+		pg_log_error_hint("Specify either new publications to create or existing publications to use, but not both.");
+		exit(1);
+	}
+
 	/* Number of object names must match number of databases */
 	if (num_pubs > 0 && num_pubs != num_dbs)
 	{
@@ -2327,6 +2409,17 @@ main(int argc, char **argv)
 							num_pubs, num_dbs);
 		exit(1);
 	}
+
+	for (SimpleStringListCell *cell = opt.existing_pub_names.head; cell != NULL; cell = cell->next)
+		total_pubs_configured++;
+
+	if (total_pubs_configured > 0 && total_pubs_configured != num_dbs)
+	{
+		pg_log_error("wrong number of existing publication names specified");
+		pg_log_error_detail("The number of specified existing publication names (%d) must match the number of specified database names (%d).",
+							total_pubs_configured, num_dbs);
+		exit(1);
+	}
 	if (num_subs > 0 && num_subs != num_dbs)
 	{
 		pg_log_error("wrong number of subscription names specified");
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..c49a5ec2e88 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,99 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub3 FOR TABLE tbl1");
+$node_p->safe_psql($db2, "CREATE PUBLICATION test_pub4 FOR TABLE tbl2");
+
+# Initialize node_s2 as a fresh standby of node_p for table-level
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber with invalid --existing-publication option
+# (conflict with --publication)
+command_fails_like(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--publication' => 'pub1',
+		'--database' => $db1,
+		'--existing-publication' => 'test_pub3',
+	],
+	qr/options --publication and --existing-publication cannot be used together/,
+	'pg_createsubscriber rejects --publication with --existing-publication');
+
+# Run pg_createsubscriber on node S with --existing-publication option.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--existing-publication' => 'test_pub3',
+		'--existing-publication' => 'test_pub4',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub3}
+{test_pub4}),
+	"subscriptions use the correct publications in $db1 and $db2");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

