From 96b89e2ec34a8a948cc771a30c62d95b1c8112df Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 28 Aug 2025 22:26:04 +0530
Subject: [PATCH v8] Support existing publications in pg_createsubscriber

Allow pg_createsubscriber to reuse existing publications instead of failing
when they already exist on the publisher.

Previously, pg_createsubscriber would fail if any specified publication already
existed. Now, existing publications are reused as-is with their current
configuration, and non-existing publications are createdcautomatically with
FOR ALL TABLES.

This change provides flexibility when working with mixed scenarios of existing
and new publications. Users should verify that existing publications have the
desired configuration before reusing them, and can use --dry-run to see which
publications will be reused and which will be created.

When publications are reused, they are never dropped during cleanup operations,
ensuring pre-existing publications remain available for other uses.
Only publications created by pg_createsubscriber are cleaned up.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  17 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   |  61 +++++++--
 .../t/040_pg_createsubscriber.pl              | 116 ++++++++++++++++++
 3 files changed, 184 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..3d057013a97 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -285,6 +285,23 @@ PostgreSQL documentation
        a generated name is assigned to the publication name. This option cannot
        be used together with <option>--all</option>.
       </para>
+      <para>
+       If a publication with the specified name already exists on the publisher,
+       it will be reused as-is with its current configuration, including its
+       table list, row filters, column filters, and all other settings.
+       If a publication does not exist, it will be created automatically with
+       <literal>FOR ALL TABLES</literal>.
+      </para>
+      <para>
+       Use <option>--dry-run</option> to see which publications will be reused
+       and which will be created before running the actual command.
+       When publications are reused, they will not be dropped during cleanup
+       operations, ensuring they remain available for other uses.
+       Only publications created by
+       <application>pg_createsubscriber</application> on the target server will
+       be cleaned up if the operation fails. Publications on the publisher
+       server are never modified or dropped by cleanup operations.
+      </para>
      </listitem>
     </varlistentry>
 
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..c8155959ef4 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -114,6 +114,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -753,6 +754,32 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Check if a publication with the given name exists in the specified database.
+ * Returns true if it exists, false otherwise.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	exists = (PQntuples(res) == 1);
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -789,13 +816,27 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		if (num_replslots == 0)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
-		/*
-		 * Create publication on publisher. This step should be executed
-		 * *before* promoting the subscriber to avoid any transactions between
-		 * consistent LSN and the new publication rows (such transactions
-		 * wouldn't see the new publication rows resulting in an error).
-		 */
-		create_publication(conn, &dbinfo[i]);
+		if (check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+		{
+			/* Reuse existing publication on publisher. */
+			pg_log_info("using existing publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			dbinfo[i].made_publication = false;
+		}
+		else
+		{
+			/*
+			 * Create publication on publisher. This step should be executed
+			 * *before* promoting the subscriber to avoid any transactions
+			 * between consistent LSN and the new publication rows (such
+			 * transactions wouldn't see the new publication rows resulting in
+			 * an error).
+			 */
+			create_publication(conn, &dbinfo[i]);
+			pg_log_info("created publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			dbinfo[i].made_publication = true;
+		}
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1767,10 +1808,10 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	}
 
 	/*
-	 * In dry-run mode, we don't create publications, but we still try to drop
-	 * those to provide necessary information to the user.
+	 * Only drop publications that were created by pg_createsubscriber during
+	 * this operation. Pre-existing publications are preserved.
 	 */
-	if (!drop_all_pubs || dry_run)
+	if (dbinfo->made_publication)
 		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
 						 &dbinfo->made_publication);
 }
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..9d9c7259598 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,125 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1,
+	"CREATE PUBLICATION test_pub_existing FOR TABLE tbl1");
+
+# Initialize node_s2 as a fresh standby of node_p for existing/new
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber on node S2 with --dry-run
+($stdout, $stderr) = run_command(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--dry-run',
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+	],
+	'run pg_createsubscriber --dry-run on node S2');
+
+like(
+	$stderr,
+	qr/using existing publication "test_pub_existing"/,
+	'dry-run logs reuse of existing publication');
+like(
+	$stderr,
+	qr/created publication "test_pub_new"/,
+	'dry-run logs creation of new publication');
+
+# Verify the existing publication is still there and unchanged
+my $existing_pub_count = $node_p->safe_psql($db1,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_existing'"
+);
+is($existing_pub_count, '1',
+	'existing publication remains unchanged after dry-run');
+
+# Verify no actual publications were created during dry-run
+my $pub_count_after_dry_run = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($pub_count_after_dry_run, '0',
+	'dry-run did not actually create publications');
+
+# Run pg_createsubscriber on node S2 without --dry-run
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Verify that test_pub_new was created in db2
+$result = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($result, '1', 'test_pub_new publication was created in db2');
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub_existing}
+{test_pub_new}),
+	"subscriptions use the correct publications");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

