From 1dc2abc77278b6847f016348fedbd4d060fe6a97 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 23 Jan 2025 12:39:03 +0530
Subject: [PATCH v3] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--cleanup-publisher-objects' option in the
'pg_createsubscriber utility'.
This feature ensures a clean and streamlined setup of logical replication by
removing stale or unnecessary publications from the subscriber node.
These publications, replicated during streaming replication, become redundant
after converting to logical replication and serve no further purpose.
By ensuring that outdated publications are removed, it helps avoid potential
conflicts and simplifies replication management.

A new 'drop_all_publications()' function is added to fetch and drop all
publications on the subscriber node.
Since this cleanup is not required when upgrading streaming replication
clusters,this feature is supported only when the '--cleanup-publisher-objects'
option is specified, allowing users to choose accordingly.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 11 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 77 ++++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              | 23 +++++-
 3 files changed, 108 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 26b8e64a4e..7c45fe960e 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,17 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-C</option></term>
+     <term><option>--cleanup-publisher-objects</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications on the subscriber node before creating
+       a subscription.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index faf18ccf13..d4cb3ebd36 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -43,6 +43,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		cleanup_publisher_objects;	/* drop all publications */
 };
 
 struct LogicalRepInfo
@@ -98,6 +99,7 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_all_publications(const struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -220,6 +222,7 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -C  --cleanup-publisher-objects drop all publications on the logical replica\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1860,11 +1863,75 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	destroyPQExpBuffer(str);
 }
 
+/* Drops all existing logical replication publications from all subscriber
+ * databases
+ */
+static void
+drop_all_publications(const struct LogicalRepInfo *dbinfo)
+{
+	const char *search_query = "SELECT pubname FROM pg_catalog.pg_publication;";
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		int			num_rows;
+		PQExpBuffer query = createPQExpBuffer();
+
+		/* Connect to the subscriber */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+
+		/* Fetch all publications */
+		res = PQexec(conn, search_query);
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_warning("could not obtain publication information: %s",
+						   PQresultErrorMessage(res));
+
+			PQclear(res);
+			disconnect_database(conn, false);
+			continue;
+		}
+
+		num_rows = PQntuples(res);
+
+		for (int j = 0; j < num_rows; j++)
+		{
+			char	   *pubname = PQgetvalue(res, j, 0);
+			PGresult   *res_for_drop;
+
+			pg_log_debug("dropping publication \"%s\"", pubname);
+
+			appendPQExpBuffer(query, "DROP PUBLICATION %s;", pubname);
+
+			if (!dry_run)
+			{
+				res_for_drop = PQexec(conn, query->data);
+
+				if (PQresultStatus(res_for_drop) != PGRES_COMMAND_OK)
+				{
+					pg_log_warning("could not drop publication \"%s\": %s",
+								   pubname, PQresultErrorMessage(res));
+				}
+
+				PQclear(res_for_drop);
+			}
+
+			resetPQExpBuffer(query);
+		}
+
+		disconnect_database(conn, false);
+		destroyPQExpBuffer(query);
+	}
+}
+
 int
 main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"cleanup-publisher-objects", no_argument, NULL, 'C'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1927,6 +1994,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.cleanup_publisher_objects = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,11 +2017,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "Cd:D:np:P:s:t:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'C':
+				opt.cleanup_publisher_objects = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2237,6 +2308,10 @@ main(int argc, char **argv)
 	/* Remove failover replication slots if they exist on subscriber */
 	drop_failover_replication_slots(dbinfo);
 
+	/* Drop publications from the subscriber if requested */
+	if (opt.cleanup_publisher_objects)
+		drop_all_publications(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b..9f2543bf7a 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -371,8 +371,20 @@ command_ok(
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
-# to show more information.
+$node_s->start;
+# Create publications to test it's removal
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub FOR ALL TABLES;");
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;");
+
+# Verify the existing publications
+my $pub_count_before =
+  $node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_before, '2',
+	'two publications created before --cleanup-publisher-objects is run');
+
+$node_s->stop;
+# Run pg_createsubscriber on node S using --cleanup-publisher-objects.
+# --verbose is used twice to show more information.
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -388,6 +400,7 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--cleanup-publisher-objects',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,6 +419,12 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+my $pub_count_after =
+  $node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_after, '0',
+	'all publications dropped after --cleanup-publisher-objects is run');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.34.1

