From fabb1b5d1c35fc86cdfab8dcd7da43e2f572993b Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 28 Aug 2025 22:26:04 +0530
Subject: [PATCH v5] Support existing publications in pg_createsubscriber
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Add a new '--if-not-exists' option to pg_createsubscriber, allowing users to
reuse existing publications if they are already present, or create them if they
do not exist.
This simplifies publication handling while providing more flexibility when
setting up logical replication.

Key features:
1. New '--if-not-exists' flag changes the behavior of '--publication'.
2. If the publication exists, it is reused.
3. If it does not exist, it is created automatically.
4. Supports per-database specification, consistent with other options.
5. Avoids the complexity of option conflicts and count-matching rules.
6. Provides semantics consistent with SQL’s IF NOT EXISTS syntax.

This design streamlines the CLI, improves usability, and supports scenarios
where some publications are reused while others are created during subscriber
setup.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 31 ++++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 98 ++++++++++++++++---
 .../t/040_pg_createsubscriber.pl              | 80 +++++++++++++++
 3 files changed, 197 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..e7656d764d3 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -258,6 +258,37 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>--if-not-exists</option></term>
+     <listitem>
+      <para>
+       When this option is specified,
+       <application>pg_createsubscriber</application> will check if each
+       publication specified with <option>--publication</option>
+       already exists on the publisher. If a publication exists, it will be
+       reused. If a publication does not exist, it will be created
+       automatically with <literal>FOR ALL TABLES</literal>.
+      </para>
+      <para>
+       This option provides flexibility for mixed scenarios where some
+       publications may already exist while others need to be created.
+       It eliminates the need to know in advance which publications exist on
+       the publisher.
+      </para>
+      <para>
+       When <option>--if-not-exists</option> is used, existing publications will
+       not be dropped during cleanup operations, ensuring they remain available
+       for other uses. Only publications that were created by
+       <application>pg_createsubscriber</application> will be cleaned up.
+      </para>
+      <para>
+       This option follows the same semantics as SQL
+       <literal>IF NOT EXISTS</literal> clauses, providing consistent behavior
+       with other PostgreSQL tools.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>--config-file=<replaceable class="parameter">filename</replaceable></option></term>
      <listitem>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..fc0972eca9d 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -42,6 +42,7 @@ struct CreateSubscriberOptions
 	bool		two_phase;		/* enable-two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
+	bool		if_not_exists;	/* --if-not-exists option */
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
@@ -61,6 +62,8 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	bool		publication_existed;	/* publication existed before we
+										 * started */
 };
 
 /*
@@ -93,7 +96,7 @@ static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
 static bool server_is_in_recovery(PGconn *conn);
 static char *generate_object_name(PGconn *conn);
 static void check_publisher(const struct LogicalRepInfo *dbinfo);
-static char *setup_publisher(struct LogicalRepInfo *dbinfo);
+static char *setup_publisher(struct LogicalRepInfo *dbinfo, const struct CreateSubscriberOptions *opt);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
 							 const char *consistent_lsn);
@@ -114,6 +117,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -202,7 +206,7 @@ cleanup_objects_atexit(void)
 			conn = connect_database(dbinfo->pubconninfo, false);
 			if (conn != NULL)
 			{
-				if (dbinfo->made_publication)
+				if (dbinfo->made_publication && !dbinfo->publication_existed)
 					drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
 									 &dbinfo->made_publication);
 				if (dbinfo->made_replslot)
@@ -216,7 +220,7 @@ cleanup_objects_atexit(void)
 				 * that some objects were left on primary and should be
 				 * removed before trying again.
 				 */
-				if (dbinfo->made_publication)
+				if (dbinfo->made_publication && !dbinfo->publication_existed)
 				{
 					pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
 								   dbinfo->pubname,
@@ -263,6 +267,7 @@ usage(void)
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
 			 "                                  file when running target cluster\n"));
 	printf(_("      --publication=NAME          publication name\n"));
+	printf(_("      --if-not-exists             reuse existing publications if they exist\n"));
 	printf(_("      --replication-slot=NAME     replication slot name\n"));
 	printf(_("      --subscription=NAME         subscription name\n"));
 	printf(_("  -V, --version                   output version information, then exit\n"));
@@ -497,6 +502,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 			dbinfo[i].replslotname = NULL;
 		dbinfo[i].made_replslot = false;
 		dbinfo[i].made_publication = false;
+		dbinfo[i].publication_existed = false;
 		/* Fill subscriber attributes */
 		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
 		dbinfo[i].subconninfo = conninfo;
@@ -753,6 +759,31 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Add function to check if publication exists.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists = false;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	exists = (PQntuples(res) == 1);
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -760,7 +791,7 @@ generate_object_name(PGconn *conn)
  * set_replication_progress).
  */
 static char *
-setup_publisher(struct LogicalRepInfo *dbinfo)
+setup_publisher(struct LogicalRepInfo *dbinfo, const struct CreateSubscriberOptions *opt)
 {
 	char	   *lsn = NULL;
 
@@ -780,22 +811,46 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		 * no replication slot is specified. It follows the same rule as
 		 * CREATE SUBSCRIPTION.
 		 */
-		if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+		if (dbinfo[i].pubname == NULL || dbinfo[i].subname == NULL || dbinfo[i].replslotname == NULL)
 			genname = generate_object_name(conn);
-		if (num_pubs == 0)
+		if (dbinfo[i].pubname == NULL)
 			dbinfo[i].pubname = pg_strdup(genname);
-		if (num_subs == 0)
+		if (dbinfo[i].subname == NULL)
 			dbinfo[i].subname = pg_strdup(genname);
-		if (num_replslots == 0)
+		if (dbinfo[i].replslotname == NULL)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
+		/*
+		 * Check if publication already exists when --if-not-exists is
+		 * specified
+		 */
+		if (opt->if_not_exists)
+		{
+			dbinfo[i].publication_existed = check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname);
+			if (dbinfo[i].publication_existed)
+			{
+				pg_log_info("using existing publication \"%s\" in database \"%s\"",
+							dbinfo[i].pubname, dbinfo[i].dbname);
+			}
+		}
+
 		/*
 		 * Create publication on publisher. This step should be executed
 		 * *before* promoting the subscriber to avoid any transactions between
 		 * consistent LSN and the new publication rows (such transactions
 		 * wouldn't see the new publication rows resulting in an error).
 		 */
-		create_publication(conn, &dbinfo[i]);
+		if (opt->if_not_exists && dbinfo[i].publication_existed)
+			dbinfo[i].made_publication = false;
+		else
+		{
+			create_publication(conn, &dbinfo[i]);
+			if (opt->if_not_exists)
+			{
+				pg_log_info("created publication \"%s\" in database \"%s\"",
+							dbinfo[i].pubname, dbinfo[i].dbname);
+			}
+		}
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1771,8 +1826,14 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	 * those to provide necessary information to the user.
 	 */
 	if (!drop_all_pubs || dry_run)
-		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
-						 &dbinfo->made_publication);
+	{
+		if (!dbinfo->publication_existed)
+			drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+							 &dbinfo->made_publication);
+	}
+	else
+		pg_log_info("not dropping existing publication \"%s\" in database \"%s\"",
+					dbinfo->pubname, dbinfo->dbname);
 }
 
 /*
@@ -2047,6 +2108,7 @@ main(int argc, char **argv)
 		{"replication-slot", required_argument, NULL, 3},
 		{"subscription", required_argument, NULL, 4},
 		{"clean", required_argument, NULL, 5},
+		{"if-not-exists", no_argument, NULL, 6},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2095,6 +2157,7 @@ main(int argc, char **argv)
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
 	opt.two_phase = false;
+	opt.if_not_exists = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -2200,6 +2263,9 @@ main(int argc, char **argv)
 				else
 					pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
 				break;
+			case 6:
+				opt.if_not_exists = true;
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -2220,6 +2286,8 @@ main(int argc, char **argv)
 			bad_switch = "--replication-slot";
 		else if (num_subs > 0)
 			bad_switch = "--subscription";
+		else if (opt.if_not_exists)
+			bad_switch = "--if-not-exists";
 
 		if (bad_switch)
 		{
@@ -2319,6 +2387,12 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (opt.if_not_exists && num_pubs == 0)
+	{
+		pg_log_error("--if-not-exists requires --publication to be specified");
+		exit(1);
+	}
+
 	/* Number of object names must match number of databases */
 	if (num_pubs > 0 && num_pubs != num_dbs)
 	{
@@ -2424,7 +2498,7 @@ main(int argc, char **argv)
 	stop_standby_server(subscriber_dir);
 
 	/* Create the required objects for each database on publisher */
-	consistent_lsn = setup_publisher(dbinfos.dbinfo);
+	consistent_lsn = setup_publisher(dbinfos.dbinfo, &opt);
 
 	/* Write the required recovery parameters */
 	setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..95e7a056a21 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,89 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1,
+	"CREATE PUBLICATION test_pub_existing FOR TABLE tbl1");
+
+# Initialize node_s2 as a fresh standby of node_p for table-level
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber on node S2 with --if-not-exists option for mixed
+# scenario (one existing publication, one new publication)
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+		'--if-not-exists',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Verify that test_pub_new was created in db2
+$result = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($result, '1', 'test_pub_new publication was created in db2');
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub_existing}
+{test_pub_new}),
+	"subscriptions use the correct publications with --if-not-exists in $db1 and $db2"
+);
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

