From 617127bbd11a2aad31231f18d53a4c54dbc802e7 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v22 2/3] pg_upgrade: Allow to replicate logical replication
 slots to new node

This commit allows nodes with logical replication slots to be upgraded. While
reading information from the old cluster, a list of logical replication slots is
newly extracted. At the later part of upgrading, pg_upgrade revisits the list
and restores slots by using the pg_create_logical_replication_slots() on the new
clushter.

Note that it must be done after the final pg_resetwal command during the upgrade
because pg_resetwal will remove WALs that are required by the slots. Due to the
restriction, the timing of restoring replication slots is different from other
objects.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei
---
 doc/src/sgml/ref/pgupgrade.sgml               |  56 ++++++++
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  70 +++++++++-
 src/bin/pg_upgrade/function.c                 |  18 ++-
 src/bin/pg_upgrade/info.c                     | 122 ++++++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               |  78 +++++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  19 +++
 .../t/003_logical_replication_slots.pl        | 123 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 10 files changed, 487 insertions(+), 6 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..848f7e8432 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -402,6 +402,62 @@ NET STOP postgresql-&majorversion;
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slot on the new publisher.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     After the upgrade is complete, execute the
+     <command>ALTER SUBSCRIPTION ... CONNECTION</command> command to update the
+     connection string, and then re-enable the subscription.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the replication slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       All slots on old cluster must be usable, i.e., there are no slots which
+       <structfield>wal_status</structfield> is <literal>lost</literal> (see
+       <xref linkend="view-pg-replication-slots"/>).
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugin referred by slots on old cluster must be installed on
+       the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       New cluster must have larger
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       than existing slots on old cluster.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       New cluster must be set
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Run <application>pg_upgrade</application></title>
 
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 64024e3b9e..ed5b07fbb7 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,6 +30,7 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
 
 
 /*
@@ -89,6 +90,9 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Extract a list of logical replication slots */
+	get_logical_slot_infos(&old_cluster, live_check);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -189,6 +193,17 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	/*
+	 * Checking for logical slots must be done before
+	 * check_new_cluster_is_empty() because the slot_arr attribute of the
+	 * new_cluster will be checked in that function.
+	 */
+	if (count_logical_slots(&old_cluster))
+	{
+		get_logical_slot_infos(&new_cluster, false);
+		check_for_logical_replication_slots(&new_cluster);
+	}
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -352,7 +367,9 @@ check_new_cluster_is_empty(void)
 	for (dbnum = 0; dbnum < new_cluster.dbarr.ndbs; dbnum++)
 	{
 		int			relnum;
-		RelInfoArr *rel_arr = &new_cluster.dbarr.dbs[dbnum].rel_arr;
+		DbInfo     *pDbInfo = &new_cluster.dbarr.dbs[dbnum];
+		RelInfoArr *rel_arr = &pDbInfo->rel_arr;
+		LogicalSlotInfoArr *slot_arr = &pDbInfo->slot_arr;
 
 		for (relnum = 0; relnum < rel_arr->nrels;
 			 relnum++)
@@ -360,10 +377,18 @@ check_new_cluster_is_empty(void)
 			/* pg_largeobject and its index should be skipped */
 			if (strcmp(rel_arr->rels[relnum].nspname, "pg_catalog") != 0)
 				pg_fatal("New cluster database \"%s\" is not empty: found relation \"%s.%s\"",
-						 new_cluster.dbarr.dbs[dbnum].db_name,
+						 pDbInfo->db_name,
 						 rel_arr->rels[relnum].nspname,
 						 rel_arr->rels[relnum].relname);
 		}
+
+		/*
+		 * Check the existence of logical replication slots.
+		 */
+		if (slot_arr->nslots)
+			pg_fatal("New cluster database \"%s\" is not empty: found logical replication slot \"%s\"",
+					 pDbInfo->db_name,
+					 slot_arr->slots[0].slotname);
 	}
 }
 
@@ -1402,3 +1427,44 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * Verify the parameter settings necessary for creating logical replication
+ * slots.
+ */
+static void
+check_for_logical_replication_slots(ClusterInfo *new_cluster)
+{
+	PGresult   *res;
+	PGconn	   *conn = connectToServer(new_cluster, "template1");
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* logical replication slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(new_cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking parameter settings for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (count_logical_slots(&old_cluster) > max_replication_slots)
+		pg_fatal("max_replication_slots must be greater than existing logical "
+				 "replication slots on old node.");
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	PQclear(res);
+
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..e197d1f043 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,12 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries:
+ *	1. Name of library files containing C-language functions (for non-built-in
+ *	   functions), and
+ *	2. Shared object (library) names containing the logical replication output
+ *	   plugins
+ *
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -66,14 +71,21 @@ get_loadable_libraries(void)
 		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name);
 
 		/*
-		 * Fetch all libraries containing non-built-in C functions in this DB.
+		 * Fetch all libraries containing non-built-in C functions or referred
+		 * by logical replication slots in this DB.
 		 */
 		ress[dbnum] = executeQueryOrDie(conn,
 										"SELECT DISTINCT probin "
 										"FROM pg_catalog.pg_proc "
 										"WHERE prolang = %u AND "
 										"probin IS NOT NULL AND "
-										"oid >= %u;",
+										"oid >= %u "
+										"UNION "
+										"SELECT DISTINCT plugin "
+										"FROM pg_catalog.pg_replication_slots "
+										"WHERE wal_status <> 'lost' AND "
+										"database = current_database() AND "
+										"temporary IS FALSE;",
 										ClanguageId,
 										FirstNormalObjectId);
 		totaltups += PQntuples(ress[dbnum]);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d6..b8505ae65b 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,7 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 
 /*
@@ -394,7 +395,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +601,107 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+
+	int			num_slots;
+
+	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status <> 'lost' AND "
+							"database = current_database() AND "
+							"temporary IS FALSE;");
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+void
+get_logical_slot_infos(ClusterInfo *cluster, bool live_check)
+{
+	int			dbnum;
+	int			slot_count = 0;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+		slot_count += pDbInfo->slot_arr.nslots;
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+}
+
+/*
+ * count_logical_slots()
+ *
+ * Sum up and return the number of logical replication slots for all databases.
+ */
+int
+count_logical_slots(ClusterInfo *cluster)
+{
+	int			dbnum;
+	int			slotnum = 0;
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+		slotnum += cluster->dbarr.dbs[dbnum].slot_arr.nslots;
+
+	return slotnum;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +712,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * doing create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.nslots == 0);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -660,3 +768,15 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_arr->slots[slotnum].slotname,
+			   slot_arr->slots[slotnum].plugin,
+			   slot_arr->slots[slotnum].two_phase);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..5bfd17160b 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (count_logical_slots(&old_cluster))
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,67 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PGconn     *conn;
+		PQExpBuffer query;
+		int			slotnum;
+		char		log_file_name[MAXPGPATH];
+
+		/* Skip this database if there are no slots */
+		if (slot_arr->nslots == 0)
+			continue;
+
+		conn = connectToServer(&new_cluster, old_db->db_name);
+		query = createPQExpBuffer();
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			/*
+			 * Constructs query for creating logical replication slots.
+			 *
+			 * XXX: For simplification, pg_create_logical_replication_slot() is
+			 * used. Is it sufficient?
+			 */
+			appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+			appendStringLiteralConn(query, slot_arr->slots[slotnum].slotname,
+									conn);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteralConn(query, slot_arr->slots[slotnum].plugin,
+									conn);
+			appendPQExpBuffer(query, ", false, %s);",
+							  slot_arr->slots[slotnum].two_phase ? "true" : "false");
+
+			PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(query);
+	}
+
+	end_progress_output();
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..6ba2efe1b3 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -150,6 +150,22 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;			/* number of logical slot infos */
+	LogicalSlotInfo *slots;		/* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +192,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,6 +417,8 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+void		get_logical_slot_infos(ClusterInfo *cluster, bool live_check);
+int			count_logical_slots(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..3df2d3c284
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,123 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old node
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+$old_publisher->start;
+
+# Initialize new node
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'replica');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+$old_publisher->stop;
+
+# Create a slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
+	'postgres', "SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# Cause a failure at the start of pg_upgrade because wal_level is replica
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with wrong wal_level');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Create an unnecessary slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
+	'postgres', qq[
+	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
+]);
+
+$old_publisher->stop;
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# smaller than existing slots on old node
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# Cause a failure at the start of pg_upgrade because the new node has
+# insufficient max_replication_slots
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new node has insufficient max_replication_slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# Remove an unnecessary slot and consume WAL records
+$old_publisher->start;
+$old_publisher->safe_psql(
+	'postgres', qq[
+	SELECT pg_drop_replication_slot('test_slot2');
+	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)
+]);
+$old_publisher->stop;
+
+# Actual run, pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot1|t), 'check the slot exists on new node');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51b7951ad8..0071efef1c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1501,7 +1501,10 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

