From 1d219210a24ad7f83b1668598de82b3b4e829a64 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 13 Feb 2025 09:25:11 +0530
Subject: [PATCH v8] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--cleanup-existing-publications' option in the
'pg_createsubscriber' utility.

This feature ensures a clean and streamlined setup of logical replication by
removing publications on the subscriber that were originally replicated from
the primary server during streaming replication.
These publications become redundant once the setup transitions to logical
replication and serve no further purpose.

To prevent accidental removal of user-created publications on the subscriber,
this cleanup process only targets publications that existed on the source
server and were replicated to the subscriber.
Any manually created publications on the subscriber will remain intact.

This feature is optional and only takes effect when the
'--cleanup-existing-publications' option is explicitly specified.
Since publication cleanup is not required when upgrading streaming replication
clusters, this option provides users with the flexibility to enable or skip the
cleanup process as needed.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  11 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 120 +++++++++++++-----
 .../t/040_pg_createsubscriber.pl              |  20 ++-
 3 files changed, 117 insertions(+), 34 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index d56487fe2c..a98b583537 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,17 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-c</option></term>
+     <term><option>--cleanup-existing-publications</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications from specified databases on the target
+       server.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 9fdf15e5ac..d8d24ab51a 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -43,6 +43,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		drop_publications;	/* drop all publications */
 };
 
 struct LogicalRepInfo
@@ -79,7 +80,8 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn,
+							 bool drop_publications);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -97,6 +99,9 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void _drop_one_publication(PGconn *conn, const char *pubname,
+								  const char *dbname);
+static void drop_all_publications(PGconn *conn, const char *dbname);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
@@ -220,6 +225,8 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -c  --cleanup-existing-publications\n"
+			 "                                  drop all publications on the subscriber\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1154,10 +1161,12 @@ check_and_drop_existing_subscriptions(PGconn *conn,
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
- * replication setup.
+ * replication setup. If 'drop_publications' is true, existing publications on
+ * the subscriber will be dropped before creating new subscriptions.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool drop_publications)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1174,10 +1183,20 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 */
 		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
 
+		/* Drop all existing publications if requested */
+		if (drop_publications)
+		{
+			pg_log_info("Dropping all existing publications in database \"%s\"",
+						dbinfo[i].dbname);
+			drop_all_publications(conn, dbinfo[i].dbname);
+		}
+
 		/*
 		 * Since the publication was created before the consistent LSN, it is
 		 * available on the subscriber when the physical replica is promoted.
 		 * Remove publications from the subscriber because it has no use.
+		 * Additionally, drop publications existed before this command if
+		 * requested.
 		 */
 		drop_publication(conn, &dbinfo[i]);
 
@@ -1642,50 +1661,82 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	destroyPQExpBuffer(str);
 }
 
-/*
- * Remove publication if it couldn't finish all steps.
- */
+/* Helper function to drop a single publication by name. */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+_drop_one_publication(PGconn *conn, const char *pubname, const char *dbname)
 {
-	PQExpBuffer str = createPQExpBuffer();
+	PQExpBuffer query = createPQExpBuffer();
 	PGresult   *res;
+
+	Assert(conn != NULL);
+	Assert(pubname != NULL);
+	Assert(dbname != NULL);
+
+	appendPQExpBuffer(query, "DROP PUBLICATION %s", pubname);
+	pg_log_debug("Executing: %s", query->data);
+	pg_log_info("Dropping publication %s in database \"%s\"", pubname, dbinfo->dbname);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, query->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not drop publication %s in database \"%s\": %s",
+						 pubname, dbname, PQresultErrorMessage(res));
+		}
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(query);
+}
+
+/* Drop a single publication, given in dbinfo. */
+static void
+drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
 	char	   *pubname_esc;
 
 	Assert(conn != NULL);
+	Assert(dbinfo != NULL);
+	Assert(dbinfo->pubname != NULL);
 
 	pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
 
-	pg_log_info("dropping publication \"%s\" in database \"%s\"",
-				dbinfo->pubname, dbinfo->dbname);
-
-	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
+	_drop_one_publication(conn, pubname_esc, dbinfo->dbname);
 
 	PQfreemem(pubname_esc);
+}
 
-	pg_log_debug("command is: %s", str->data);
+/* Drop all publications in the database. */
+static void
+drop_all_publications(PGconn *conn, const char *dbname)
+{
+	PGresult   *res;
+	int			count = 0;
 
-	if (!dry_run)
+	Assert(conn != NULL);
+	Assert(dbname != NULL);
+
+	res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		res = PQexec(conn, str->data);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		{
-			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-			dbinfo->made_publication = false;	/* don't try again. */
-
-			/*
-			 * Don't disconnect and exit here. This routine is used by primary
-			 * (cleanup publication / replication slot due to an error) and
-			 * subscriber (remove the replicated publications). In both cases,
-			 * it can continue and provide instructions for the user to remove
-			 * it later if cleanup fails.
-			 */
-		}
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
 		PQclear(res);
+		return;
 	}
 
-	destroyPQExpBuffer(str);
+	for (int i = 0; i < PQntuples(res); i++)
+	{
+		char	   *pubname_esc = PQescapeIdentifier(conn, PQgetvalue(res, i, 0),
+													 strlen(PQgetvalue(res, i, 0)));
+
+		_drop_one_publication(conn, pubname_esc, dbname);
+		PQfreemem(pubname_esc);
+		count++;
+	}
+
+	PQclear(res);
 }
 
 /*
@@ -1888,6 +1939,7 @@ main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"cleanup-existing-publications", no_argument, NULL, 'c'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1955,6 +2007,7 @@ main(int argc, char **argv)
 		0
 	};
 	opt.recovery_timeout = 0;
+	opt.drop_publications = false;
 
 	/*
 	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
@@ -1972,11 +2025,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "cd:D:np:P:s:t:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'c':
+				opt.drop_publications = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2252,7 +2308,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfo, consistent_lsn);
+	setup_subscriber(dbinfo, consistent_lsn, opt.drop_publications);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b..3301b1425f 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -329,6 +329,16 @@ $node_p->safe_psql($db1,
 	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
 $node_p->wait_for_replay_catchup($node_s);
+
+# Create publications to test their removal
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub FOR ALL TABLES;");
+$node_p->safe_psql($db1, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;");
+
+$node_p->wait_for_replay_catchup($node_s);
+
+ok( $node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications created before --cleanup-existing-publications is run');
+
 $node_s->stop;
 
 # dry run mode on node S
@@ -371,8 +381,8 @@ command_ok(
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
-# to show more information.
+# Run pg_createsubscriber on node S using --cleanup-existing-publications.
+# --verbose is used twice to show more information.
 command_ok(
 	[
 		'pg_createsubscriber',
@@ -388,6 +398,7 @@ command_ok(
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--cleanup-existing-publications',
 	],
 	'run pg_createsubscriber on node S');
 
@@ -406,6 +417,11 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm publications are removed from the subscriber node
+is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+	'0',
+	'all publications dropped after --cleanup-existing-publications is run');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.34.1

