From 728184a84167d5648f4ac687fe130b8fb5c62e98 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 23 Jan 2025 12:39:03 +0530
Subject: [PATCH v2] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--cleanup-publisher-objects' option in the
'pg_createsubscriber utility'.
This feature ensures a clean and streamlined setup of logical replication by
removing stale or unnecessary publications from the subscriber node.
These publications, replicated during streaming replication, become redundant
after converting to logical replication and serve no further purpose.
By ensuring that outdated publications are removed, it helps avoid potential
conflicts and simplifies replication management.

A new 'drop_all_publications()' function is added to fetch and drop all
publications on the subscriber node.
Since this cleanup is not required when upgrading streaming replication
clusters,this feature is supported only when the '--cleanup-publisher-objects'
option is specified, allowing users to choose accordingly.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 11 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 74 ++++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              | 21 +++++-
 3 files changed, 103 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 26b8e64a4e..7c45fe960e 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,17 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-C</option></term>
+     <term><option>--cleanup-publisher-objects</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications on the subscriber node before creating
+       a subscription.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index faf18ccf13..c49e987f69 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -43,6 +43,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		cleanup_publisher_objects;	/* Drop all publications */
 };
 
 struct LogicalRepInfo
@@ -98,6 +99,7 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_all_publications(const struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -220,6 +222,7 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -C  --cleanup-publisher-objects drop all publications on the logical replica\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1860,11 +1863,72 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	destroyPQExpBuffer(str);
 }
 
+static void
+drop_all_publications(const struct LogicalRepInfo *dbinfo)
+{
+	char	   *search_query = "SELECT pubname FROM pg_catalog.pg_publication;";
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		int			num_rows;
+		PQExpBuffer query = createPQExpBuffer();
+
+		/* Connect to the subscriber */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+
+		/* Fetch all publications */
+		res = PQexec(conn, search_query);
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_warning("could not obtain publication information: %s",
+						   PQresultErrorMessage(res));
+
+			PQclear(res);
+			disconnect_database(conn, false);
+			continue;
+		}
+
+		num_rows = PQntuples(res);
+
+		for (int j = 0; j < num_rows; j++)
+		{
+			char	   *pubname = PQgetvalue(res, j, 0);
+			PGresult   *res_for_drop;
+
+			pg_log_debug("dropping publication \"%s\"", pubname);
+
+			appendPQExpBuffer(query, "DROP PUBLICATION %s;", pubname);
+
+			if (!dry_run)
+			{
+				res_for_drop = PQexec(conn, query->data);
+
+				if (PQresultStatus(res_for_drop) != PGRES_COMMAND_OK)
+				{
+					pg_log_warning("could not drop publication \"%s\": %s",
+								   pubname, PQresultErrorMessage(res));
+				}
+
+				PQclear(res_for_drop);
+			}
+
+			resetPQExpBuffer(query);
+		}
+
+		disconnect_database(conn, false);
+		destroyPQExpBuffer(query);
+	}
+}
+
 int
 main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"cleanup-publisher-objects", no_argument, NULL, 'C'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1927,6 +1991,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.cleanup_publisher_objects = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,11 +2014,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "Cd:D:np:P:s:t:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'C':
+				opt.cleanup_publisher_objects = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2237,6 +2305,10 @@ main(int argc, char **argv)
 	/* Remove failover replication slots if they exist on subscriber */
 	drop_failover_replication_slots(dbinfo);
 
+	/* Drop publications from the subscriber if requested */
+	if (opt.cleanup_publisher_objects)
+		drop_all_publications(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b..a609807357 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -371,8 +371,18 @@ command_ok(
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
-# to show more information.
+# Create publications to test it's removal
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub FOR ALL TABLES;");
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;");
+
+# Verify the existing publications
+my $pub_count_before =
+  $node_p->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_before, '2',
+	'two publications created before --cleanup-publisher-objects is run');
+
+# Run pg_createsubscriber on node S using --cleanup-publisher-objects.
+# --verbose is used twice to show more information.
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -388,6 +398,7 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--cleanup-publisher-objects',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,6 +417,12 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+my $pub_count_after =
+  $node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_after, '0',
+	'all publications dropped after --cleanup-publisher-objects is run');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.41.0.windows.3

