From 99b89173587a8ce35545bbd02e9eecebf029fd13 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v24 2/3] pg_upgrade: Allow to replicate logical replication
 slots to new node

This commit allows nodes with logical replication slots to be upgraded. While
reading information from the old cluster, a list of logical replication slots is
fetched. At the later part of upgrading, pg_upgrade revisits the list and
restores slots by executing pg_create_logical_replication_slots() on the new
cluster.

Note that slot restoration must be done after the final pg_resetwal command
during the upgrade because pg_resetwal will remove WALs that are required by
the slots. Due to this restriction, the timing of restoring replication slots is
different from other objects.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei, Masahiko Sawada
---
 doc/src/sgml/ref/pgupgrade.sgml               |  62 ++++++++
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  65 ++++++++
 src/bin/pg_upgrade/function.c                 |  18 ++-
 src/bin/pg_upgrade/info.c                     | 132 ++++++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               |  78 ++++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  19 +++
 .../t/003_logical_replication_slots.pl        | 139 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 10 files changed, 516 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..95d1d3ced8 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -360,6 +360,68 @@ make prefix=/usr/local/pgsql.new install
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> attempts to migrate logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slots on the new publisher.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher cluster, ensure that the
+     subscription is temporarily disabled, by executing
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
+     Re-enable the subscription after the upgrade.
+    </para>
+
+    <para>
+     There are some prerequisites for <application>pg_upgrade</application> to
+     be able to upgrade the replication slots. If these are not met an error
+     will be reported.
+    </para>
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       All slots on the old cluster must be usable, i.e., there are no slots
+       whose <structfield>wal_status</structfield> is <literal>lost</literal> (see
+       <xref linkend="view-pg-replication-slots"/>).
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       <structfield>confirmed_flush_lsn</structfield> (see <xref linkend="view-pg-replication-slots"/>)
+       of all slots on the old cluster must be the same as the latest
+       checkpoint location.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The output plugins referenced by the slots on the old cluster must be
+       installed in the new PostgreSQL executable directory.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+       configured to a value greater than or equal to the number of slots
+       present in the old cluster.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       The new cluster must have
+       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+       <literal>logical</literal>.
+      </para>
+     </listitem>
+    </itemizedlist>
+
+   </step>
+
    <step>
     <title>Stop both servers</title>
 
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 64024e3b9e..61a81c5011 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,6 +30,7 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_new_cluster_logical_replication_slots(void);
 
 
 /*
@@ -89,6 +90,9 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Extract a list of logical replication slots */
+	get_logical_slot_infos(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -189,6 +193,10 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	/* Logical replication slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(new_cluster.major_version) >= 1700)
+		check_new_cluster_logical_replication_slots();
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -1402,3 +1410,60 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * check_new_cluster_logical_replication_slots()
+ *
+ * Make sure there are no logical replication slots on the new cluster and that
+ * the parameter settings necessary for creating slots are sufficient.
+ */
+static void
+check_new_cluster_logical_replication_slots(void)
+{
+	PGresult   *res;
+	PGconn	   *conn;
+	int			nslots = count_logical_slots(&old_cluster);
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* Quick exit if there are no logical slots on the old cluster */
+	if (nslots == 0)
+		return;
+
+	conn = connectToServer(&new_cluster, "template1");
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SELECT slot_name "
+								  "FROM pg_catalog.pg_replication_slots "
+								  "WHERE slot_type = 'logical' AND "
+								  "temporary IS FALSE;");
+
+	if (PQntuples(res))
+		pg_fatal("New cluster must not have logical replication slots but found \"%s\"",
+				 PQgetvalue(res, 0, 0));
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (nslots > max_replication_slots)
+		pg_fatal("max_replication_slots (%d) must be greater than or equal to the number of "
+				 "logical replication slots (%d) on the old cluster.",
+				 max_replication_slots, nslots);
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				wal_level);
+
+	PQclear(res);
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..2813d2ff20 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,12 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries:
+ *	1. Name of library files containing C-language functions (for non-built-in
+ *	   functions), and
+ *	2. Shared object (library) names containing the logical replication output
+ *	   plugins
+ *
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -66,14 +71,21 @@ get_loadable_libraries(void)
 		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name);
 
 		/*
-		 * Fetch all libraries containing non-built-in C functions in this DB.
+		 * Fetch all libraries containing non-built-in C functions, or referred
+		 * to by logical replication slots in this DB.
 		 */
 		ress[dbnum] = executeQueryOrDie(conn,
 										"SELECT DISTINCT probin "
 										"FROM pg_catalog.pg_proc "
 										"WHERE prolang = %u AND "
 										"probin IS NOT NULL AND "
-										"oid >= %u;",
+										"oid >= %u "
+										"UNION "
+										"SELECT DISTINCT plugin "
+										"FROM pg_catalog.pg_replication_slots "
+										"WHERE wal_status <> 'lost' AND "
+										"database = current_database() AND "
+										"temporary IS FALSE;",
 										ClanguageId,
 										FirstNormalObjectId);
 		totaltups += PQntuples(ress[dbnum]);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index aa5faca4d6..59ccc01b57 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,7 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 
 /*
@@ -394,7 +395,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +601,113 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+
+	int			num_slots;
+
+	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase "
+							"FROM pg_catalog.pg_replication_slots "
+							"WHERE wal_status <> 'lost' AND "
+							"database = current_database() AND "
+							"temporary IS FALSE;");
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+void
+get_logical_slot_infos(ClusterInfo *cluster)
+{
+	int			dbnum;
+
+	/* Logical slots can be migrated since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+}
+
+/*
+ * count_logical_slots()
+ *
+ * Sum up and return the number of logical replication slots for all databases.
+ */
+int
+count_logical_slots(ClusterInfo *cluster)
+{
+	int			dbnum;
+	int			slot_count = 0;
+
+	/* Quick exit if the version is prior to PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return 0;
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+		slot_count += cluster->dbarr.dbs[dbnum].slot_arr.nslots;
+
+	return slot_count;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +718,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.nslots == 0);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -660,3 +774,19 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+	{
+		LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_info->slotname,
+			   slot_info->plugin,
+			   slot_info->two_phase);
+	}
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..f3d3991bef 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots.
+	 *
+	 * Note: This must be done after doing the pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (count_logical_slots(&old_cluster))
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,67 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PGconn     *conn;
+		PQExpBuffer query;
+		int			slotnum;
+		char		log_file_name[MAXPGPATH];
+
+		/* Skip this database if there are no slots */
+		if (slot_arr->nslots == 0)
+			continue;
+
+		conn = connectToServer(&new_cluster, old_db->db_name);
+		query = createPQExpBuffer();
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			LogicalSlotInfo *slot_info = &slot_arr->slots[slotnum];
+
+			/*
+			 * Constructs a query for creating logical replication slots.
+			 *
+			 * XXX: For simplification, pg_create_logical_replication_slot() is
+			 * used. Is it sufficient?
+			 */
+			appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+			appendStringLiteralConn(query, slot_info->slotname, conn);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteralConn(query, slot_info->plugin, conn);
+			appendPQExpBuffer(query, ", false, %s);",
+							  slot_info->two_phase ? "true" : "false");
+
+			PQclear(executeQueryOrDie(conn, "%s", query->data));
+
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(query);
+	}
+
+	end_progress_output();
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..2dac266537 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -150,6 +150,22 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;			/* number of logical slot infos */
+	LogicalSlotInfo *slots;		/* array of logical slot infos */
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +192,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,6 +417,8 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+void		get_logical_slot_infos(ClusterInfo *cluster);
+int			count_logical_slots(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..ae87c33708
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old cluster
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+
+# Initialize new cluster
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'replica');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when new cluster wal_level is not 'logical'
+
+# Preparations for the subsequent test:
+# 1. Create a slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# pg_upgrade will fail because the new cluster wal_level is 'replica'
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has the wrong wal_level');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Confirm pg_upgrade fails when max_replication_slots on a new cluster is
+#		too low
+
+# Preparations for the subsequent test:
+# 1. Create a second slot on the old cluster
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);"
+);
+$old_publisher->stop;
+
+# 2. max_replication_slots is set to smaller than the number of slots (2)
+#	 present on the old cluster
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# 3. wal_level is set correctly on the new cluster
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+
+# pg_upgrade will fail because the new cluster has insufficient max_replication_slots
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade where the new cluster has insufficient max_replication_slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------
+# TEST: Successful upgrade
+
+# Preparations for the subsequent test:
+# 1. Remove the slot 'test_slot2', leaving only 1 slot remaining on the old
+#	 cluster, so the new cluster config  max_replication_slots=1 will now be
+#	 enough.
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT * FROM pg_drop_replication_slot('test_slot2');"
+);
+
+# 2. Consume WAL records
+$old_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('test_slot1', NULL, NULL)"
+);
+$old_publisher->stop;
+
+# Actual run, successful upgrade is expected
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old cluster');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check that the slot 'test_slot1' has migrated to the new cluster
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot1|t), 'check the slot exists on new cluster');
+$new_publisher->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51b7951ad8..0071efef1c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1501,7 +1501,10 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

