From 3895daeb67d22866052a5ff9aea7d2c32c6eb78a Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 13 Feb 2025 09:25:11 +0530
Subject: [PATCH v6] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--cleanup-existing-publications' option in the
'pg_createsubscriber utility'.
This feature ensures a clean and streamlined setup of logical replication by
removing stale or unnecessary publications from the subscriber node.
These publications, replicated during streaming replication, become redundant
after converting to logical replication and serve no further purpose.
By ensuring that outdated publications are removed, it helps avoid potential
conflicts and simplifies replication management.

Since this cleanup is not required when upgrading streaming replication
clusters, this feature is supported only when the
'--cleanup-existing-publications' option is specified, allowing users to
choose accordingly.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  10 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 120 ++++++++++++++----
 .../t/040_pg_createsubscriber.pl              |  20 ++-
 3 files changed, 122 insertions(+), 28 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 26b8e64a4e0..c2d4e376e87 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,16 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-c</option></term>
+     <term><option>--cleanup-existing-publications</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications from specified databases.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 2d881d54f5b..487105867f9 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -43,6 +43,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		cleanup_existing_publications;	/* drop all publications */
 };
 
 struct LogicalRepInfo
@@ -79,7 +80,8 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn,
+							 bool cleanup_existing_publications);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -97,7 +99,8 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo,
+							 bool cleanup_existing_publications);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -180,7 +183,7 @@ cleanup_objects_atexit(void)
 			if (conn != NULL)
 			{
 				if (dbinfo[i].made_publication)
-					drop_publication(conn, &dbinfo[i]);
+					drop_publication(conn, &dbinfo[i], false);
 				if (dbinfo[i].made_replslot)
 					drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
 				disconnect_database(conn, false);
@@ -220,6 +223,8 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -c  --cleanup-existing-publications\n"
+			 "                                  drop all publications on the subscriber\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1139,7 +1144,8 @@ check_and_drop_existing_subscriptions(PGconn *conn,
  * replication setup.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool cleanup_existing_publications)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1160,8 +1166,10 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 * Since the publication was created before the consistent LSN, it is
 		 * available on the subscriber when the physical replica is promoted.
 		 * Remove publications from the subscriber because it has no use.
+		 * Additionally, drop publications existed before this command if
+		 * requested.
 		 */
-		drop_publication(conn, &dbinfo[i]);
+		drop_publication(conn, &dbinfo[i], cleanup_existing_publications);
 
 		create_subscription(conn, &dbinfo[i]);
 
@@ -1621,25 +1629,83 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Remove all existing publications from specified databases on the target
+ * server.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo,
+				 bool cleanup_existing_publications)
 {
-	PQExpBuffer str = createPQExpBuffer();
+	PQExpBuffer targets = createPQExpBuffer();
 	PGresult   *res;
-	char	   *pubname_esc;
+	int			count = 0;
+	PQExpBuffer str;
 
 	Assert(conn != NULL);
 
-	pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+	/*
+	 * Gather all dropping publications. Usually, the target is the
+	 * publication created by the command, but if requested, all publications
+	 * are.
+	 */
+	if (cleanup_existing_publications)
+	{
+		const char *search_query =
+			"SELECT pubname FROM pg_catalog.pg_publication;";
 
-	pg_log_info("dropping publication \"%s\" in database \"%s\"",
-				dbinfo->pubname, dbinfo->dbname);
+		res = PQexec(conn, search_query);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain publication information: %s",
+						 PQresultErrorMessage(res));
 
-	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
+			PQclear(res);
+			dbinfo->made_publication = false;	/* don't try again. */
 
-	PQfreemem(pubname_esc);
+			/*
+			 * Don't disconnect and exit here. This routine is used by primary
+			 * (cleanup publication / replication slot due to an error) and
+			 * subscriber (remove the replicated publications). In both cases,
+			 * it can continue and provide instructions for the user to remove
+			 * it later if cleanup fails.
+			 */
+			return;
+		}
+
+		/* Append all the publication's name into the target string */
+		for (int i = 0; i < PQntuples(res); i++)
+		{
+			char	   *pubname_esc = PQescapeIdentifier(conn, PQgetvalue(res, i, 0),
+														 strlen(PQgetvalue(res, i, 0)));
+
+			if (i > 0)
+				appendPQExpBufferStr(targets, ", ");
+
+			appendPQExpBuffer(targets, "%s", pubname_esc);
+			PQfreemem(pubname_esc);
+			count++;
+		}
+
+		PQclear(res);
+	}
+	else
+	{
+		char	   *pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname,
+													 strlen(dbinfo->pubname));
+
+		appendPQExpBuffer(targets, "%s", pubname_esc);
+		PQfreemem(pubname_esc);
+
+		count++;
+	}
+
+	pg_log_info(ngettext("dropping publication %s in database \"%s\"",
+						 "dropping publications %s in database \"%s\"",
+						 count),
+				targets->data, dbinfo->dbname);
+
+	str = createPQExpBuffer();
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", targets->data);
 
 	pg_log_debug("command is: %s", str->data);
 
@@ -1648,22 +1714,19 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-			dbinfo->made_publication = false;	/* don't try again. */
+			pg_log_error(ngettext("could not drop publication %s in database \"%s\": %s",
+								  "could not drop publications %s in database \"%s\": %s",
+								  count),
+						 targets->data, dbinfo->dbname, PQresultErrorMessage(res));
+			dbinfo->made_publication = false;
 
-			/*
-			 * Don't disconnect and exit here. This routine is used by primary
-			 * (cleanup publication / replication slot due to an error) and
-			 * subscriber (remove the replicated publications). In both cases,
-			 * it can continue and provide instructions for the user to remove
-			 * it later if cleanup fails.
-			 */
+			/* Don't disconnect and exit here as discussed before */
 		}
 		PQclear(res);
 	}
 
 	destroyPQExpBuffer(str);
+	destroyPQExpBuffer(targets);
 }
 
 /*
@@ -1866,6 +1929,7 @@ main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"cleanup-existing-publications", no_argument, NULL, 'c'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1933,6 +1997,7 @@ main(int argc, char **argv)
 		0
 	};
 	opt.recovery_timeout = 0;
+	opt.cleanup_existing_publications = false;
 
 	/*
 	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
@@ -1950,11 +2015,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "cd:D:np:P:s:t:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'c':
+				opt.cleanup_existing_publications = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2230,7 +2298,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfo, consistent_lsn);
+	setup_subscriber(dbinfo, consistent_lsn, opt.cleanup_existing_publications);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b7..93af35daa0a 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -329,6 +329,16 @@ $node_p->safe_psql($db1,
 	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
 $node_p->wait_for_replay_catchup($node_s);
+
+# Create publications to test it's removal
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub FOR ALL TABLES;");
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;");
+
+$node_p->wait_for_replay_catchup($node_s);
+
+ok( $node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications created before --cleanup-existing-publications is run');
+
 $node_s->stop;
 
 # dry run mode on node S
@@ -371,8 +381,8 @@ command_ok(
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
-# to show more information.
+# Run pg_createsubscriber on node S using --cleanup-existing-publications.
+# --verbose is used twice to show more information.
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -388,6 +398,7 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--cleanup-existing-publications',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,6 +417,11 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+	'0',
+	'all publications dropped after --cleanup-existing-publications is run');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.41.0.windows.3

