On Fri, 21 Jul 2023 at 13:00, Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Dear hackers,
>
> > Based on the above, we are considering that we delay the timing of shutdown 
> > for
> > logical walsenders. The preliminary workflow is:
> >
> > 1. When logical walsenders receives siginal from checkpointer, it consumes 
> > all
> >    of WAL records, change its state into WALSNDSTATE_STOPPING, and stop
> > doing
> >    anything.
> > 2. Then the checkpointer does the shutdown checkpoint
> > 3. After that postmaster sends signal to walsenders, same as current
> > implementation.
> > 4. Finally logical walsenders process the shutdown checkpoint record and 
> > update
> > the
> >   confirmed_lsn after the acknowledgement from subscriber.
> >   Note that logical walsenders don't have to send a shutdown checkpoint 
> > record
> >   to subscriber but following keep_alive will help us to increment the
> > confirmed_lsn.
> > 5. All tasks are done, they exit.
> >
> > This mechanism ensures that the confirmed_lsn of active slots is same as the
> > current
> > WAL location of old publisher, so that 0003 patch would become more simpler.
> > We would not have to calculate the acceptable difference anymore.
> >
> > One thing we must consider is that any WALs must not be generated while
> > decoding
> > the shutdown checkpoint record. It causes the PANIC. IIUC the record leads
> > SnapBuildSerializationPoint(), which just serializes snapbuild or restores 
> > from
> > it, so the change may be acceptable. Thought?
>
> I've implemented the ideas from my previous proposal, PSA another patch set.
> Patch 0001 introduces the state WALSNDSTATE_STOPPING to logical walsenders. 
> The
> workflow remains largely the same as described in my previous post, with the
> following additions:
>
> * A flag has been added to track whether all the WALs have been flushed. The
>   logical walsender can only exit after the flag is set. This ensures that all
>   WALs are flushed before the termination of the walsender.
> * Cumulative statistics are now forcibly written before changing the state.
>   While the previous involved reporting stats upon process exit, the current 
> approach
>   must report earlier due to the checkpointer's termination timing. See 
> comments
>   in CheckpointerMain() and atop pgstat_before_server_shutdown().
> * At the end of processes, slots are now saved to disk.
>
>
> Patch 0002 adds --include-logical-replication-slots option to pg_upgrade,
> not changed from previous set.
>
> Patch 0003 adds a check function, which becomes simpler.
> The previous version calculated the "acceptable" difference between 
> confirmed_lsn
> and the current WAL position. This was necessary because shutdown records 
> could
> not be sent to subscribers, creating a disparity in these values. However, 
> this
> approach had drawbacks, such as needing adjustments if record sizes changed.
>
> Now, the record can be sent to subscribers, so the hacking is not needed 
> anymore,
> at least in the context of logical replication. The consistency is now 
> maintained
> by the logical walsenders, so slots created by the backend could not be.
> We must consider what should be...
>
> How do you think?

Here is a patch which checks that there are no WAL records other than
CHECKPOINT_SHUTDOWN WAL record to be consumed based on the discussion
from [1].
Patch 0001 and 0002 is same as the patch posted by Kuroda-san, Patch
0003 exposes pg_get_wal_records_content to get the WAL records along
with the WAL record type between start and end lsn. pg_walinspect
contrib module already exposes a function for this requirement, I have
moved this functionality to be exposed from the backend. Patch 0004
has slight change in check function to check that there are no other
records other than CHECKPOINT_SHUTDOWN to be consumed. The attached
patch has the changes for the same.
Thoughts?

[1] - 
https://www.postgresql.org/message-id/CAA4eK1Kem-J5NM7GJCgyKP84pEN6RsG6JWo%3D6pSn1E%2BiexL1Fw%40mail.gmail.com

Regards,
Vignesh
From 49360788fea01756aa9578da3dad34786bd1281a Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH 4/4] pg_upgrade: Add check function for
 --include-logical-replication-slots option

XXX: Actually, this commit disallows to support slots which are created by user
backends. In the checking function we ensure that all the avtive slots have
confirmed_flush_lsn which is same as current WAL position, and they would not be
the same. For slots which are used by logical replication, logical walsenders
guarantee that at the shutdown. For individual slots, however, cannot be handled
by walsenders, so confirmed_flush_lsn is behind shutdown checkpoint record.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C
---
 src/bin/pg_upgrade/check.c                    |  59 ++++++++
 .../t/003_logical_replication_slots.pl        | 134 +++++++++++-------
 2 files changed, 140 insertions(+), 53 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 4b54f5567c..264895721a 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -31,6 +31,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 static int num_slots_on_old_cluster;
 
@@ -109,6 +110,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (user_opts.include_logical_slots)
+		check_for_confirmed_flush_lsn(&old_cluster);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
@@ -1478,3 +1481,59 @@ check_for_logical_replication_slots(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1500)
+		return;
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the current WAL
+	 * position.
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE (SELECT count(record_type) "
+							"		FROM pg_catalog.pg_get_wal_records_content(confirmed_flush_lsn, pg_catalog.pg_current_wal_insert_lsn()) "
+							"		WHERE record_type != 'CHECKPOINT_SHUTDOWN') <> 0 "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		is_error = true;
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slots requires that all "
+				 "logical replication slots consumed all the WALs");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 9ca266f6b2..87049d7116 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -15,132 +15,160 @@ use Test::More;
 my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 
 # Initialize old node
-my $old_node = PostgreSQL::Test::Cluster->new('old_node');
-$old_node->init(allows_streaming => 'logical');
-$old_node->start;
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
 
 # Initialize new node
-my $new_node = PostgreSQL::Test::Cluster->new('new_node');
-$new_node->init(allows_streaming => 1);
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 1);
 
-my $bindir = $new_node->config_data('--bindir');
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_node->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # Cause a failure at the start of pg_upgrade because wal_level is replica
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with wrong wal_level');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. The case max_replication_slots is set
 # to 0 is prohibited.
-$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 0");
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is 0
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with wrong max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # non-zero value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# Create a slot on old node, and generate WALs
-$old_node->start;
-$old_node->safe_psql(
+# Create a slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
-	SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true);
-	SELECT pg_create_logical_replication_slot('to_be_dropped', 'test_decoding', false, true);
+	SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);
+	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
 	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
 ]);
 
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is
 # smaller than existing slots on old node
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with small max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # appropriate value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 10");
 
-# Remove an unnecessary slot and consume WALs
-$old_node->start;
-$old_node->safe_psql(
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
-	SELECT pg_drop_replication_slot('to_be_dropped');
-	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)
+	SELECT pg_drop_replication_slot('test_slot1');
+	SELECT pg_drop_replication_slot('test_slot2');
 ]);
-$old_node->stop;
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+$old_publisher->stop;
 
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots'
 	],
 	'run of pg_upgrade of old node');
-ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ removed after pg_upgrade success");
 
-$new_node->start;
-my $result = $new_node->safe_psql('postgres',
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot|t), 'check the slot exists on new node');
+is($result, qq(sub|t), 'check the slot exists on new node');
+
+# Change the connection
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
 
 done_testing();
-- 
2.34.1

From 8cb9043ece78ba5235b11b7277b608a37044c559 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH 2/3] pg_upgrade: Add --include-logical-replication-slots
 option

This commit introduces a new pg_upgrade option called "--include-logical-replication-slots".
This allows nodes with logical replication slots to be upgraded. The commit can
be divided into two parts: one for pg_dump and another for pg_upgrade.

For pg_dump this commit includes a new option called "--logical-replication-slots-only".
This option can be used to dump logical replication slots. When this option is
specified, the slot_name, plugin, and two_phase parameters are extracted from
pg_replication_slots. An SQL file that executes pg_create_logical_replication_slot()
with the extracted parameters is generated.

For pg_upgrade, when '--include-logical-replication-slots' is specified, it executes
pg_dump with the new "--logical-replication-slots-only" option and restores the slots
using the pg_create_logical_replication_slots() statements that the dump
generated (see above). Note that we cannot dump replication slots at the same time
as the schema dump because we need to separate the timing of restoring replication
slots and other objects. Replication slots, in  particular, should not be restored
before executing the pg_resetwal command because it will remove WALs that are
required by the slots.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously, pg_upgrade
allowed copying publications to a new node. With this new commit, adjusting the
connection string to the new publisher will cause the apply worker on the subscriber
to connect to the new publisher automatically. This enables seamless continuation
of logical replication, even after an upgrade.

Author: Hayato Kuroda
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei
---
 doc/src/sgml/ref/pgupgrade.sgml               |  11 ++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     | 155 ++++++++++++++++++
 src/bin/pg_dump/pg_dump.h                     |  14 +-
 src/bin/pg_dump/pg_dump_sort.c                |  11 +-
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  76 +++++++++
 src/bin/pg_upgrade/dump.c                     |  24 +++
 src/bin/pg_upgrade/info.c                     | 111 ++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/option.c                   |   7 +
 src/bin/pg_upgrade/pg_upgrade.c               |  61 +++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  21 +++
 .../t/003_logical_replication_slots.pl        | 146 +++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 15 files changed, 641 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..94e90ff506 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--include-logical-replication-slots</option></term>
+      <listitem>
+       <para>
+        Upgrade logical replication slots. Only permanent replication slots
+        are included. Note that pg_upgrade does not check the installation of
+        plugins.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..0a4e931f9b 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -187,6 +187,7 @@ typedef struct _dumpOptions
 	int			use_setsessauth;
 	int			enable_row_security;
 	int			load_via_partition_root;
+	int			logical_slots_only;
 
 	/* default, if no "inclusion" switches appear, is to dump everything */
 	bool		include_everything;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 5dab1ba9ea..12d4066d3b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -328,6 +328,9 @@ static void setupDumpWorker(Archive *AH);
 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
 
+static void getLogicalReplicationSlots(Archive *fout);
+static void dumpLogicalReplicationSlot(Archive *fout,
+									   const LogicalReplicationSlotInfo *slotinfo);
 
 int
 main(int argc, char **argv)
@@ -431,6 +434,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
+		{"logical-replication-slots-only", no_argument, NULL, 15},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -657,6 +661,10 @@ main(int argc, char **argv)
 										  optarg);
 				break;
 
+			case 15:			/* dump only replication slot(s) */
+				dopt.logical_slots_only = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -714,6 +722,21 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	if (dopt.logical_slots_only)
+	{
+		if (!dopt.binary_upgrade)
+			pg_fatal("options --logical-replication-slots-only requires option --binary-upgrade");
+
+		if (dopt.dataOnly)
+			pg_fatal("options --logical-replication-slots-only and -a/--data-only cannot be used together");
+
+		if (dopt.schemaOnly)
+			pg_fatal("options --logical-replication-slots-only and -s/--schema-only cannot be used together");
+
+		if (dopt.outputClean)
+			pg_fatal("options --logical-replication-slots-only and -c/--clean cannot be used together");
+	}
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -876,6 +899,16 @@ main(int argc, char **argv)
 			pg_fatal("no matching extensions were found");
 	}
 
+	/*
+	 * If dump logical-replication-slots-only was requested, dump only them
+	 * and skip everything else.
+	 */
+	if (dopt.logical_slots_only)
+	{
+		getLogicalReplicationSlots(fout);
+		goto dump;
+	}
+
 	/*
 	 * Dumping LOs is the default for dumps where an inclusion switch is not
 	 * used (an "include everything" dump).  -B can be used to exclude LOs
@@ -936,6 +969,8 @@ main(int argc, char **argv)
 	if (!dopt.no_security_labels)
 		collectSecLabels(fout);
 
+dump:
+
 	/* Lastly, create dummy objects to represent the section boundaries */
 	boundaryObjs = createBoundaryObjects();
 
@@ -1109,6 +1144,12 @@ help(const char *progname)
 			 "                               servers matching PATTERN\n"));
 	printf(_("  --inserts                    dump data as INSERT commands, rather than COPY\n"));
 	printf(_("  --load-via-partition-root    load partitions via the root table\n"));
+
+	/*
+	 * The option --logical-replication-slots-only is used only by pg_upgrade
+	 * and should not be called by users, which is why it is not exposed by the
+	 * help.
+	 */
 	printf(_("  --no-comments                do not dump comments\n"));
 	printf(_("  --no-publications            do not dump publications\n"));
 	printf(_("  --no-security-labels         do not dump security label assignments\n"));
@@ -10237,6 +10278,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			dumpLogicalReplicationSlot(fout,
+									   (const LogicalReplicationSlotInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18218,6 +18263,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_LOGICAL_REPLICATION_SLOT:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
@@ -18479,3 +18525,112 @@ appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
 	if (!res)
 		pg_log_warning("could not parse %s array", "reloptions");
 }
+
+/*
+ * getLogicalReplicationSlots
+ *	  get information about replication slots
+ */
+static void
+getLogicalReplicationSlots(Archive *fout)
+{
+	PGresult   *res;
+	LogicalReplicationSlotInfo *slotinfo;
+	PQExpBuffer query;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+	int			i,
+				ntups;
+
+	/* Check whether we should dump or not */
+	if (fout->remoteVersion < 170000)
+		return;
+
+	Assert(fout->dopt->logical_slots_only);
+
+	query = createPQExpBuffer();
+
+	resetPQExpBuffer(query);
+
+	/*
+	 * Get replication slots.
+	 *
+	 * XXX: Which information must be extracted from old node? Currently three
+	 * attributes are extracted because they are used by
+	 * pg_create_logical_replication_slot().
+	 */
+	appendPQExpBufferStr(query,
+						 "SELECT slot_name, plugin, two_phase "
+						 "FROM pg_catalog.pg_replication_slots "
+						 "WHERE database = current_database() AND temporary = false "
+						 "AND wal_status IN ('reserved', 'extended');");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	slotinfo = pg_malloc(ntups * sizeof(LogicalReplicationSlotInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		slotinfo[i].dobj.objType = DO_LOGICAL_REPLICATION_SLOT;
+
+		slotinfo[i].dobj.catId.tableoid = InvalidOid;
+		slotinfo[i].dobj.catId.oid = InvalidOid;
+		AssignDumpId(&slotinfo[i].dobj);
+
+		slotinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_slotname));
+
+		slotinfo[i].plugin = pg_strdup(PQgetvalue(res, i, i_plugin));
+		slotinfo[i].twophase = (strcmp(PQgetvalue(res, i, i_twophase), "t") == 0);
+
+		/*
+		 * Note: Currently we do not have any options to include/exclude slots
+		 * in dumping, so all the slots must be selected.
+		 */
+		slotinfo[i].dobj.dump = DUMP_COMPONENT_DEFINITION;
+	}
+	PQclear(res);
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpLogicalReplicationSlot
+ *	  dump creation functions for the given logical replication slots
+ */
+static void
+dumpLogicalReplicationSlot(Archive *fout,
+						   const LogicalReplicationSlotInfo *slotinfo)
+{
+	Assert(fout->dopt->logical_slots_only);
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		PQExpBuffer query = createPQExpBuffer();
+
+		/*
+		 * XXX: For simplification, pg_create_logical_replication_slot() is
+		 * used. Is it sufficient?
+		 */
+		appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+		appendStringLiteralAH(query, slotinfo->dobj.name, fout);
+		appendPQExpBuffer(query, ", ");
+		appendStringLiteralAH(query, slotinfo->plugin, fout);
+		appendPQExpBuffer(query, ", false, %s);",
+						  slotinfo->twophase ? "true" : "false");
+
+		ArchiveEntry(fout, slotinfo->dobj.catId, slotinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = slotinfo->dobj.name,
+								  .description = "REPLICATION SLOT",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+		destroyPQExpBuffer(query);
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index bc8f2ec36d..ed1866d9ab 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_LOGICAL_REPLICATION_SLOT
 } DumpableObjectType;
 
 /*
@@ -667,6 +668,17 @@ typedef struct _SubscriptionInfo
 	char	   *subpasswordrequired;
 } SubscriptionInfo;
 
+/*
+ * The LogicalReplicationSlotInfo struct is used to represent replication
+ * slots.
+ */
+typedef struct _LogicalReplicationSlotInfo
+{
+	DumpableObject dobj;
+	char	   *plugin;
+	bool		twophase;
+} LogicalReplicationSlotInfo;
+
 /*
  *	common utility functions
  */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 523a19c155..ae65443228 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -93,6 +93,7 @@ enum dbObjectTypePriorities
 	PRIO_PUBLICATION_REL,
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,
 	PRIO_SUBSCRIPTION,
+	PRIO_LOGICAL_REPLICATION_SLOT,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
 	PRIO_REFRESH_MATVIEW		/* must be last! */
@@ -146,10 +147,11 @@ static const int dbObjectTypePriority[] =
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */
-	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */
+	PRIO_LOGICAL_REPLICATION_SLOT	/* DO_LOGICAL_REPLICATION_SLOT */
 };
 
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_LOGICAL_REPLICATION_SLOT + 1),
 				 "array length mismatch");
 
 static DumpId preDataBoundId;
@@ -1542,6 +1544,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			snprintf(buf, bufsize,
+					 "LOGICAL REPLICATION SLOT (ID %d NAME %s)",
+					 obj->dumpId, obj->name);
+			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
 					 "PRE-DATA BOUNDARY  (ID %d)",
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 64024e3b9e..4b54f5567c 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,7 +30,9 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
 
+static int num_slots_on_old_cluster;
 
 /*
  * fix_path_separator
@@ -89,6 +91,10 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Additionally, extract a list of logical replication slots if required */
+	if (user_opts.include_logical_slots)
+		num_slots_on_old_cluster = get_logical_slot_infos(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -189,6 +195,17 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	/*
+	 * Do additional work if --include-logical-replication-slots was specified.
+	 * This must be done before check_new_cluster_is_empty() because the
+	 * slot_arr attribute of the new_cluster will be checked in that function.
+	 */
+	if (user_opts.include_logical_slots)
+	{
+		(void) get_logical_slot_infos(&new_cluster);
+		check_for_logical_replication_slots(&new_cluster);
+	}
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -364,6 +381,22 @@ check_new_cluster_is_empty(void)
 						 rel_arr->rels[relnum].nspname,
 						 rel_arr->rels[relnum].relname);
 		}
+
+		/*
+		 * If --include-logical-replication-slots is required, check the
+		 * existence of slots.
+		 */
+		if (user_opts.include_logical_slots)
+		{
+			DbInfo *pDbInfo = &new_cluster.dbarr.dbs[dbnum];
+			LogicalSlotInfoArr *slot_arr = &pDbInfo->slot_arr;
+
+			/* if nslots > 0, report just first entry and exit */
+			if (slot_arr->nslots)
+				pg_fatal("New cluster database \"%s\" is not empty: found logical replication slot \"%s\"",
+						 pDbInfo->db_name,
+						 slot_arr->slots[0].slotname);
+		}
 	}
 }
 
@@ -1402,3 +1435,46 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * Verify the parameter settings necessary for creating logical replication
+ * slots.
+ */
+static void
+check_for_logical_replication_slots(ClusterInfo *new_cluster)
+{
+	PGresult   *res;
+	PGconn	   *conn = connectToServer(new_cluster, "template1");
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* --include-logical-replication-slots can be used since PG17. */
+	if (GET_MAJOR_VERSION(new_cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking parameter settings for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (max_replication_slots == 0)
+		pg_fatal("max_replication_slots must be greater than 0");
+	else if (num_slots_on_old_cluster > max_replication_slots)
+		pg_fatal("max_replication_slots must be greater than existing logical "
+				 "replication slots on old node.");
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	PQclear(res);
+
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..e6b90864f5 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -59,6 +59,30 @@ generate_old_dump(void)
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
+		/*
+		 * Dump logical replication slots if needed.
+		 *
+		 * XXX We cannot dump replication slots at the same time as the schema
+		 * dump because we need to separate the timing of restoring
+		 * replication slots and other objects. Replication slots, in
+		 * particular, should not be restored before executing the pg_resetwal
+		 * command because it will remove WALs that are required by the slots.
+		 */
+		if (user_opts.include_logical_slots)
+		{
+			char		slots_file_name[MAXPGPATH];
+
+			snprintf(slots_file_name, sizeof(slots_file_name),
+					 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+			parallel_exec_prog(log_file_name, NULL,
+							   "\"%s/pg_dump\" %s --logical-replication-slots-only "
+							   "--quote-all-identifiers --binary-upgrade %s "
+							   "--file=\"%s/%s\" %s",
+							   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+							   log_opts.verbose ? "--verbose" : "",
+							   log_opts.dumpdir,
+							   slots_file_name, escaped_connstr.data);
+		}
 		termPQExpBuffer(&escaped_connstr);
 	}
 
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index a9988abfe1..8bc0ad2e10 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,7 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 
 /*
@@ -394,7 +395,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +601,96 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+
+	int			num_slots;
+
+	char		query[QUERY_ALLOC];
+
+	snprintf(query, sizeof(query),
+			 "SELECT slot_name, plugin, two_phase "
+			 "FROM pg_catalog.pg_replication_slots "
+			 "WHERE database = current_database() AND temporary = false "
+			 "AND wal_status IN ('reserved', 'extended');");
+
+	res = executeQueryOrDie(conn, "%s", query);
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+int
+get_logical_slot_infos(ClusterInfo *cluster)
+{
+	int			dbnum;
+	int			slotnum = 0;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+		slotnum += pDbInfo->slot_arr.nslots;
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+
+	return slotnum;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +701,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * doing create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.slots == NULL);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -660,3 +757,15 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_arr->slots[slotnum].slotname,
+			   slot_arr->slots[slotnum].plugin,
+			   slot_arr->slots[slotnum].two_phase);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 640361009e..df66a5ffe6 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"include-logical-replication-slots", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.include_logical_slots = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -289,6 +294,8 @@ usage(void)
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --copy                        copy files to new cluster (default)\n"));
+	printf(_("  --include-logical-replication-slots\n"
+			 "                                upgrade logical replication slots\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..6dd3832422 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots if requested.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (user_opts.include_logical_slots)
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,50 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		slots_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		snprintf(slots_file_name, sizeof(slots_file_name),
+				 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		parallel_exec_prog(log_file_name,
+						   NULL,
+						   "\"%s/psql\" %s --echo-queries --set ON_ERROR_STOP=on "
+						   "--no-psqlrc --dbname %s -f \"%s/%s\"",
+						   new_cluster.bindir,
+						   cluster_conn_opts(&new_cluster),
+						   old_db->db_name,
+						   log_opts.dumpdir,
+						   slots_file_name);
+	}
+
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info again */
+	get_logical_slot_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..8034067492 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -29,6 +29,7 @@
 /* contains both global db information and CREATE DATABASE commands */
 #define GLOBALS_DUMP_FILE	"pg_upgrade_dump_globals.sql"
 #define DB_DUMP_FILE_MASK	"pg_upgrade_dump_%u.custom"
+#define DB_DUMP_LOGICAL_SLOTS_FILE_MASK	"pg_upgrade_dump_%u_logical_slots.sql"
 
 /*
  * Base directories that include all the files generated internally, from the
@@ -150,6 +151,22 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* Can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;
+	LogicalSlotInfo *slots;
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +193,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -304,6 +322,8 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		include_logical_slots;	/* true -> dump and restore logical
+										 * replication slots */
 } UserOpts;
 
 typedef struct
@@ -400,6 +420,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+int			get_logical_slot_infos(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..9ca266f6b2
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,146 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old node
+my $old_node = PostgreSQL::Test::Cluster->new('old_node');
+$old_node->init(allows_streaming => 'logical');
+$old_node->start;
+
+# Initialize new node
+my $new_node = PostgreSQL::Test::Cluster->new('new_node');
+$new_node->init(allows_streaming => 1);
+
+my $bindir = $new_node->config_data('--bindir');
+
+$old_node->stop;
+
+# Cause a failure at the start of pg_upgrade because wal_level is replica
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with wrong wal_level');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. The case max_replication_slots is set
+# to 0 is prohibited.
+$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+
+# Cause a failure at the start of pg_upgrade because max_replication_slots is 0
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with wrong max_replication_slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# non-zero value
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# Create a slot on old node, and generate WALs
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', qq[
+	SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true);
+	SELECT pg_create_logical_replication_slot('to_be_dropped', 'test_decoding', false, true);
+	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+]);
+
+$old_node->stop;
+
+# Cause a failure at the start of pg_upgrade because max_replication_slots is
+# smaller than existing slots on old node
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with small max_replication_slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# appropriate value
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+
+# Remove an unnecessary slot and consume WALs
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', qq[
+	SELECT pg_drop_replication_slot('to_be_dropped');
+	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)
+]);
+$old_node->stop;
+
+# Actual run, pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots'
+	],
+	'run of pg_upgrade of old node');
+ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+$new_node->start;
+my $result = $new_node->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot|t), 'check the slot exists on new node');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 05814136c6..5241b7bf8e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1497,7 +1497,10 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.34.1

From ab8839138521af35f6b00f530a39d10ef8ead555 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Fri, 21 Jul 2023 05:34:21 +0000
Subject: [PATCH 1/3] Send shutdown checkpoint record to subscriber

---
 src/backend/replication/walsender.c | 30 +++++++++++++++++++++++------
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..fc1363ba76 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -187,6 +187,9 @@ static bool WalSndCaughtUp = false;
 static volatile sig_atomic_t got_SIGUSR2 = false;
 static volatile sig_atomic_t got_STOPPING = false;
 
+/* Are all the WALs flushed? */
+static bool WalsAreFlushed = false;
+
 /*
  * This is set while we are streaming. When not set
  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
@@ -260,7 +263,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
 
-
 /* Initialize walsender process before entering the main command loop */
 void
 InitWalSender(void)
@@ -1581,7 +1583,10 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * written, because walwriter has shut down already.
 		 */
 		if (got_STOPPING)
+		{
 			XLogBackgroundFlush();
+			WalsAreFlushed = true;
+		}
 
 		/* Update our idea of the currently flushed position. */
 		if (!RecoveryInProgress())
@@ -3100,12 +3105,20 @@ XLogSendLogical(void)
 		WalSndCaughtUp = true;
 
 	/*
-	 * If we're caught up and have been requested to stop, have WalSndLoop()
-	 * terminate the connection in an orderly manner, after writing out all
-	 * the pending data.
+	 * If we're caught up, have been requested to stop and there are no pending
+	 * records to be sent, change to stopping mode.
 	 */
-	if (WalSndCaughtUp && got_STOPPING)
-		got_SIGUSR2 = true;
+	if (WalSndCaughtUp && WalsAreFlushed && !pq_is_send_pending())
+	{
+		/*
+		 * Update the stats forcibly. pgstat_shutdown_hook reports any pending
+		 * stats at the end of the process, but it would happen after the
+		 * checkpointer exits so that it would lead assertion failure. We must
+		 * ensure all the stats are recorded before changing the state.
+		 */
+		pgstat_report_stat(true);
+		WalSndSetState(WALSNDSTATE_STOPPING);
+	}
 
 	/* Update shared memory status */
 	{
@@ -3142,6 +3155,7 @@ WalSndDone(WalSndSendDataCallback send_data)
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
+
 	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
 		!pq_is_send_pending())
 	{
@@ -3152,6 +3166,10 @@ WalSndDone(WalSndSendDataCallback send_data)
 		EndCommand(&qc, DestRemote, false);
 		pq_flush();
 
+		/* Mark the slot as dirty and save it to update the confirmed_flush. */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+
 		proc_exit(0);
 	}
 	if (!waiting_for_ping_response)
-- 
2.34.1

From df324a756b175ad9014d8dfe439fea1fa8137029 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 28 Jul 2023 16:50:52 +0530
Subject: [PATCH 3/3] Move pg_get_wal_records_info functionality from
 pg_walinspect to backend.

Upgrade of publications required pg_get_wal_records_info to check that
there are no WAL record other thatn CHECKPOINT_SHUTDOWN WAL record to be
consumed. Hence moved pg_get_wal_records_info functionality as
pg_get_wal_records_content to the backend so that it can be called from
pg_upgrade.
---
 contrib/pg_walinspect/Makefile                |   2 +-
 contrib/pg_walinspect/meson.build             |   1 +
 .../pg_walinspect/pg_walinspect--1.1--1.2.sql |  32 ++++
 contrib/pg_walinspect/pg_walinspect.c         | 148 +-----------------
 contrib/pg_walinspect/pg_walinspect.control   |   2 +-
 doc/src/sgml/func.sgml                        |  52 ++++++
 src/backend/access/transam/xlog.c             |  27 ++++
 src/backend/access/transam/xlogfuncs.c        | 125 +++++++++++++++
 src/backend/access/transam/xlogutils.c        | 117 ++++++++++++++
 src/backend/catalog/system_functions.sql      |   4 +
 src/include/access/xlog.h                     |   2 +
 src/include/access/xlogutils.h                |   4 +
 src/include/catalog/pg_proc.dat               |   9 ++
 src/test/modules/brin/t/02_wal_consistency.pl |   2 +-
 .../modules/test_custom_rmgrs/t/001_basic.pl  |   2 +-
 src/test/regress/expected/misc_functions.out  |  79 ++++++++++
 src/test/regress/sql/misc_functions.sql       |  57 +++++++
 17 files changed, 515 insertions(+), 150 deletions(-)
 create mode 100644 contrib/pg_walinspect/pg_walinspect--1.1--1.2.sql

diff --git a/contrib/pg_walinspect/Makefile b/contrib/pg_walinspect/Makefile
index 22090f7716..5cc7d81b42 100644
--- a/contrib/pg_walinspect/Makefile
+++ b/contrib/pg_walinspect/Makefile
@@ -7,7 +7,7 @@ OBJS = \
 PGFILEDESC = "pg_walinspect - functions to inspect contents of PostgreSQL Write-Ahead Log"
 
 EXTENSION = pg_walinspect
-DATA = pg_walinspect--1.0.sql pg_walinspect--1.0--1.1.sql
+DATA = pg_walinspect--1.0.sql pg_walinspect--1.0--1.1.sql pg_walinspect--1.1--1.2.sql
 
 REGRESS = pg_walinspect oldextversions
 
diff --git a/contrib/pg_walinspect/meson.build b/contrib/pg_walinspect/meson.build
index 80059f6119..8f7a99a493 100644
--- a/contrib/pg_walinspect/meson.build
+++ b/contrib/pg_walinspect/meson.build
@@ -20,6 +20,7 @@ install_data(
   'pg_walinspect.control',
   'pg_walinspect--1.0.sql',
   'pg_walinspect--1.0--1.1.sql',
+  'pg_walinspect--1.1--1.2.sql',
   kwargs: contrib_data_args,
 )
 
diff --git a/contrib/pg_walinspect/pg_walinspect--1.1--1.2.sql b/contrib/pg_walinspect/pg_walinspect--1.1--1.2.sql
new file mode 100644
index 0000000000..41ec538623
--- /dev/null
+++ b/contrib/pg_walinspect/pg_walinspect--1.1--1.2.sql
@@ -0,0 +1,32 @@
+/* contrib/pg_walinspect/pg_walinspect--1.1--1.2.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION pg_walinspect UPDATE TO 1.2" to load this file. \quit
+
+/* These functions are now in the backend and callers should update to use those */
+ALTER EXTENSION pg_walinspect DROP FUNCTION pg_get_wal_records_info;
+DROP FUNCTION pg_get_wal_records_info(pg_lsn, pg_lsn);
+
+--
+-- pg_get_wal_records_info()
+--
+CREATE FUNCTION pg_get_wal_records_info(IN start_lsn pg_lsn,
+    IN end_lsn pg_lsn,
+    OUT start_lsn pg_lsn,
+    OUT end_lsn pg_lsn,
+    OUT prev_lsn pg_lsn,
+    OUT xid xid,
+    OUT resource_manager text,
+    OUT record_type text,
+    OUT record_length int4,
+    OUT main_data_length int4,
+    OUT fpi_length int4,
+    OUT description text,
+    OUT block_ref text
+)
+RETURNS SETOF record
+AS 'pg_get_wal_records_content'
+LANGUAGE INTERNAL STRICT PARALLEL SAFE;
+
+REVOKE EXECUTE ON FUNCTION pg_get_wal_records_info(pg_lsn, pg_lsn) FROM PUBLIC;
+GRANT EXECUTE ON FUNCTION pg_get_wal_records_info(pg_lsn, pg_lsn) TO pg_read_server_files;
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..6e572d3436 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -38,10 +38,6 @@ PG_FUNCTION_INFO_V1(pg_get_wal_records_info_till_end_of_wal);
 PG_FUNCTION_INFO_V1(pg_get_wal_stats);
 PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
 
-static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
-static XLogRecPtr GetCurrentLSN(void);
-static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
-static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
 static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
 							 bool *nulls, uint32 ncols);
 static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
@@ -62,120 +58,6 @@ static void GetWalStats(FunctionCallInfo fcinfo,
 static void GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 							bool show_data);
 
-/*
- * Return the LSN up to which the server has WAL.
- */
-static XLogRecPtr
-GetCurrentLSN(void)
-{
-	XLogRecPtr	curr_lsn;
-
-	/*
-	 * We determine the current LSN of the server similar to how page_read
-	 * callback read_local_xlog_page_no_wait does.
-	 */
-	if (!RecoveryInProgress())
-		curr_lsn = GetFlushRecPtr(NULL);
-	else
-		curr_lsn = GetXLogReplayRecPtr(NULL);
-
-	Assert(!XLogRecPtrIsInvalid(curr_lsn));
-
-	return curr_lsn;
-}
-
-/*
- * Initialize WAL reader and identify first valid LSN.
- */
-static XLogReaderState *
-InitXLogReaderState(XLogRecPtr lsn)
-{
-	XLogReaderState *xlogreader;
-	ReadLocalXLogPageNoWaitPrivate *private_data;
-	XLogRecPtr	first_valid_record;
-
-	/*
-	 * Reading WAL below the first page of the first segments isn't allowed.
-	 * This is a bootstrap WAL page and the page_read callback fails to read
-	 * it.
-	 */
-	if (lsn < XLOG_BLCKSZ)
-		ereport(ERROR,
-				(errmsg("could not read WAL at LSN %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	private_data = (ReadLocalXLogPageNoWaitPrivate *)
-		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
-
-	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
-											   .segment_open = &wal_segment_open,
-											   .segment_close = &wal_segment_close),
-									private_data);
-
-	if (xlogreader == NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("out of memory"),
-				 errdetail("Failed while allocating a WAL reading processor.")));
-
-	/* first find a valid recptr to start from */
-	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
-
-	if (XLogRecPtrIsInvalid(first_valid_record))
-		ereport(ERROR,
-				(errmsg("could not find a valid record after %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	return xlogreader;
-}
-
-/*
- * Read next WAL record.
- *
- * By design, to be less intrusive in a running system, no slot is allocated
- * to reserve the WAL we're about to read. Therefore this function can
- * encounter read errors for historical WAL.
- *
- * We guard against ordinary errors trying to read WAL that hasn't been
- * written yet by limiting end_lsn to the flushed WAL, but that can also
- * encounter errors if the flush pointer falls in the middle of a record. In
- * that case we'll return NULL.
- */
-static XLogRecord *
-ReadNextXLogRecord(XLogReaderState *xlogreader)
-{
-	XLogRecord *record;
-	char	   *errormsg;
-
-	record = XLogReadRecord(xlogreader, &errormsg);
-
-	if (record == NULL)
-	{
-		ReadLocalXLogPageNoWaitPrivate *private_data;
-
-		/* return NULL, if end of WAL is reached */
-		private_data = (ReadLocalXLogPageNoWaitPrivate *)
-			xlogreader->private_data;
-
-		if (private_data->end_of_wal)
-			return NULL;
-
-		if (errormsg)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
-		else
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
-	}
-
-	return record;
-}
-
 /*
  * Output values that make up a row describing caller's WAL record.
  *
@@ -502,33 +384,6 @@ pg_get_wal_record_info(PG_FUNCTION_ARGS)
 #undef PG_GET_WAL_RECORD_INFO_COLS
 }
 
-/*
- * Validate start and end LSNs coming from the function inputs.
- *
- * If end_lsn is found to be higher than the current LSN reported by the
- * cluster, use the current LSN as the upper bound.
- */
-static void
-ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
-{
-	XLogRecPtr	curr_lsn = GetCurrentLSN();
-
-	if (start_lsn > curr_lsn)
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("WAL start LSN must be less than current LSN"),
-				 errdetail("Current WAL LSN on the database system is at %X/%X.",
-						   LSN_FORMAT_ARGS(curr_lsn))));
-
-	if (start_lsn > *end_lsn)
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				 errmsg("WAL start LSN must be less than end LSN")));
-
-	if (*end_lsn > curr_lsn)
-		*end_lsn = curr_lsn;
-}
-
 /*
  * Get info of all WAL records between start LSN and end LSN.
  */
@@ -582,7 +437,8 @@ GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
 }
 
 /*
- * Get info of all WAL records between start LSN and end LSN.
+ * The following functions have been removed in newer versions in 1.2, but
+ * they are kept around for compatibility.
  */
 Datum
 pg_get_wal_records_info(PG_FUNCTION_ARGS)
diff --git a/contrib/pg_walinspect/pg_walinspect.control b/contrib/pg_walinspect/pg_walinspect.control
index efa3cb2cfe..5f574b865b 100644
--- a/contrib/pg_walinspect/pg_walinspect.control
+++ b/contrib/pg_walinspect/pg_walinspect.control
@@ -1,5 +1,5 @@
 # pg_walinspect extension
 comment = 'functions to inspect contents of PostgreSQL Write-Ahead Log'
-default_version = '1.1'
+default_version = '1.2'
 module_pathname = '$libdir/pg_walinspect'
 relocatable = true
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index b94827674c..a8866feaa3 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -26670,6 +26670,37 @@ LOG:  Grand total: 1651920 bytes in 201 blocks; 622360 free (88 chunks); 1029560
         get the replication lag.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_get_wal_records_content</primary>
+        </indexterm>
+        <function>pg_get_wal_records_content</function> ( <parameter>lsn1</parameter> <type>pg_lsn</type>, <parameter>lsn2</parameter> <type>pg_lsn</type> )
+        <returnvalue>record</returnvalue>
+        ( <parameter>start_lsn</parameter> <type>pg_lsn</type>,
+        <parameter>end_lsn</parameter> <type>pg_lsn</type>,
+        <parameter>prev_lsn</parameter> <type>pg_lsn</type>,
+        <parameter>xid</parameter> <type>xid</type>,
+        <parameter>resource_manager</parameter> <type>text</type>,
+        <parameter>record_type</parameter> <type>text</type>,
+        <parameter>record_length</parameter> <type>integer</type>,
+        <parameter>main_data_length</parameter> <type>integer</type>,
+        <parameter>fpi_length</parameter> <type>integer</type>,
+        <parameter>description</parameter> <type>text</type>,
+        <parameter>block_ref</parameter> <type>text</type>
+         )
+       </para>
+       <para>
+        Gets information of all the valid WAL records between
+        <replaceable>start_lsn</replaceable> and <replaceable>end_lsn</replaceable>.
+        Returns one row per WAL record. This can be used
+        with <structname>pg_stat_replication</structname> or some of the
+        functions shown in <xref linkend="functions-admin-backup-table"/> to
+        get the replication lag WAL records content. The function raises an
+        error if <replaceable>start_lsn</replaceable> is not available.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
@@ -26728,6 +26759,27 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
 </programlisting>
    </para>
 
+   <para>
+    <function>pg_get_wal_records_content</function> gets information of all the valid WAL records between
+     <replaceable>start_lsn</replaceable> and <replaceable>end_lsn</replaceable>.
+     Returns one row per WAL record.  For example:
+<screen>
+postgres=# SELECT * FROM pg_get_wal_records_content('0/1E913618', '0/1E913740') LIMIT 1;
+-[ RECORD 1 ]----+--------------------------------------------------------------
+start_lsn        | 0/1E913618
+end_lsn          | 0/1E913650
+prev_lsn         | 0/1E9135A0
+xid              | 0
+resource_manager | Standby
+record_type      | RUNNING_XACTS
+record_length    | 50
+main_data_length | 24
+fpi_length       | 0
+description      | nextXid 33775 latestCompletedXid 33774 oldestRunningXid 33775
+block_ref        |
+</screen>
+   </para>
+
   </sect2>
 
   <sect2 id="functions-recovery-control">
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60c0b7ec3a..6115df57fd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8988,3 +8988,30 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * Validate start and end LSNs coming from the function inputs.
+ *
+ * If end_lsn is found to be higher than the current LSN reported by the
+ * cluster, use the current LSN as the upper bound.
+ */
+void
+ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
+{
+	XLogRecPtr	curr_lsn = GetCurrentLSN();
+
+	if (start_lsn > curr_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than current LSN"),
+				 errdetail("Current WAL LSN on the database system is at %X/%X.",
+						   LSN_FORMAT_ARGS(curr_lsn))));
+
+	if (start_lsn > *end_lsn)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("WAL start LSN must be less than end LSN")));
+
+	if (*end_lsn > curr_lsn)
+		*end_lsn = curr_lsn;
+}
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 5044ff0643..bb66eb26b6 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -754,3 +754,128 @@ pg_promote(PG_FUNCTION_ARGS)
 						   wait_seconds)));
 	PG_RETURN_BOOL(false);
 }
+
+/*
+ * Get content of all WAL records between start LSN and end LSN.
+ */
+Datum
+pg_get_wal_records_content(PG_FUNCTION_ARGS)
+{
+#define PG_GET_WAL_RECORDS_INFO_COLS 11
+	FuncCallContext *funcctx;
+	XLogReaderState *xlogreader;
+
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	end_lsn = PG_GETARG_LSN(1);
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext oldcontext;
+		TupleDesc	tupdesc;
+
+		ValidateInputLSNs(start_lsn, &end_lsn);
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/*
+		 * Switch to memory context appropriate for multiple function calls
+		 */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		tupdesc = CreateTemplateTupleDesc(11);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "start_lsn",
+						   PG_LSNOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "end_lsn",
+						   PG_LSNOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prev_lsn",
+						   PG_LSNOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xid",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "resource_manager",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "record_type",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 7, "record_length",
+						   INT4OID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 8, "main_data_length",
+						   INT4OID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 9, "fpi_length",
+						   INT4OID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 10, "description",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 11, "block_ref",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		if (start_lsn < end_lsn)
+			funcctx->user_fctx = InitXLogReaderState(start_lsn);
+		else
+			funcctx->user_fctx = NULL;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	xlogreader = (XLogReaderState *) funcctx->user_fctx;
+
+	while (xlogreader && ReadNextXLogRecord(xlogreader) &&
+		   xlogreader->EndRecPtr <= end_lsn)
+	{
+		Datum		values[11] = {0};
+		bool		nulls[11] = {0};
+		HeapTuple	tuple;
+		Datum		result;
+		XLogReaderState *record = xlogreader;
+		const char *record_type;
+		StringInfoData rec_desc;
+		StringInfoData rec_blk_ref;
+		int			i = 0;
+		uint32		fpi_len = 0;
+		RmgrData	desc;
+
+		desc = GetRmgr(XLogRecGetRmid(record));
+		record_type = desc.rm_identify(XLogRecGetInfo(record));
+
+		if (record_type == NULL)
+			record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
+
+		initStringInfo(&rec_desc);
+		desc.rm_desc(&rec_desc, record);
+
+		if (XLogRecHasAnyBlockRefs(record))
+		{
+			initStringInfo(&rec_blk_ref);
+			XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
+		}
+
+		values[i++] = LSNGetDatum(record->ReadRecPtr);
+		values[i++] = LSNGetDatum(record->EndRecPtr);
+		values[i++] = LSNGetDatum(XLogRecGetPrev(record));
+		values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
+		values[i++] = CStringGetTextDatum(desc.rm_name);
+		values[i++] = CStringGetTextDatum(record_type);
+		values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
+		values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
+		values[i++] = UInt32GetDatum(fpi_len);
+
+		if (rec_desc.len > 0)
+			values[i++] = CStringGetTextDatum(rec_desc.data);
+		else
+			nulls[i++] = true;
+
+		if (XLogRecHasAnyBlockRefs(record))
+			values[i++] = CStringGetTextDatum(rec_blk_ref.data);
+		else
+			nulls[i++] = true;
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+#undef PG_GET_WAL_RECORDS_INFO_COLS
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index e174a2a891..90eab3e2f3 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1048,3 +1048,120 @@ WALReadRaiseError(WALReadError *errinfo)
 						errinfo->wre_req)));
 	}
 }
+
+/*
+ * Return the LSN up to which the server has WAL.
+ */
+XLogRecPtr
+GetCurrentLSN(void)
+{
+	XLogRecPtr	curr_lsn;
+
+	/*
+	 * We determine the current LSN of the server similar to how page_read
+	 * callback read_local_xlog_page_no_wait does.
+	 */
+	if (!RecoveryInProgress())
+		curr_lsn = GetFlushRecPtr(NULL);
+	else
+		curr_lsn = GetXLogReplayRecPtr(NULL);
+
+	Assert(!XLogRecPtrIsInvalid(curr_lsn));
+
+	return curr_lsn;
+}
+
+/*
+ * Initialize WAL reader and identify first valid LSN.
+ */
+XLogReaderState *
+InitXLogReaderState(XLogRecPtr lsn)
+{
+	XLogReaderState *xlogreader;
+	ReadLocalXLogPageNoWaitPrivate *private_data;
+	XLogRecPtr	first_valid_record;
+
+	/*
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+
+	private_data = (ReadLocalXLogPageNoWaitPrivate *)
+		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									private_data);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+	{
+		ereport(ERROR,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+	}
+
+	return xlogreader;
+}
+
+
+/*
+ * Read next WAL record.
+ *
+ * By design, to be less intrusive in a running system, no slot is allocated
+ * to reserve the WAL we're about to read. Therefore this function can
+ * encounter read errors for historical WAL.
+ *
+ * We guard against ordinary errors trying to read WAL that hasn't been
+ * written yet by limiting end_lsn to the flushed WAL, but that can also
+ * encounter errors if the flush pointer falls in the middle of a record. In
+ * that case we'll return NULL.
+ */
+XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		ReadLocalXLogPageNoWaitPrivate *private_data;
+
+		/* return NULL, if end of WAL is reached */
+		private_data = (ReadLocalXLogPageNoWaitPrivate *)
+			xlogreader->private_data;
+
+		if (private_data->end_of_wal)
+			return NULL;
+
+		if (errormsg)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X: %s",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+	}
+
+	return record;
+}
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 07c0d89c4f..1fed0def9b 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -616,6 +616,8 @@ REVOKE EXECUTE ON FUNCTION pg_backup_stop(boolean) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn) FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
@@ -726,6 +728,8 @@ REVOKE EXECUTE ON FUNCTION pg_ls_replslotdir(text) FROM PUBLIC;
 -- We also set up some things as accessible to standard roles.
 --
 
+GRANT EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn) TO pg_read_server_files;
+
 GRANT EXECUTE ON FUNCTION pg_ls_logdir() TO pg_monitor;
 
 GRANT EXECUTE ON FUNCTION pg_ls_waldir() TO pg_monitor;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 48ca852381..4ec1fc9d82 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -259,6 +259,8 @@ extern void SetInstallXLogFileSegmentActive(void);
 extern bool IsInstallXLogFileSegmentActive(void);
 extern void XLogShutdownWalRcv(void);
 
+extern void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
+
 /*
  * Routines to start, stop, and get status of a base backup.
  */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5b77b11f50..77e2974ab3 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -115,4 +115,8 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern XLogRecPtr GetCurrentLSN(void);
+extern XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
+extern XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..d45696fca0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6489,6 +6489,15 @@
   proargnames => '{rm_id, rm_name, rm_builtin}',
   prosrc => 'pg_get_wal_resource_managers' },
 
+{ oid => '3813',
+  descr => 'Info of WAL conents between start LSN and end LSN',
+  proname => 'pg_get_wal_records_content', prorows => '10', proretset => 't',
+  provolatile => 's', prorettype => 'record', proargtypes => 'pg_lsn pg_lsn',
+  proallargtypes => '{pg_lsn,pg_lsn,pg_lsn,pg_lsn,pg_lsn,xid,text,text,int4,int4,int4,text,text}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{start_lsn,end_lsn,start_lsn,end_lsn,prev_lsn,xid,resource_manager,record_type,record_length,main_data_length,fpi_length,description,block_ref}',
+  prosrc => 'pg_get_wal_records_content' },
+
 { oid => '2621', descr => 'reload configuration files',
   proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/test/modules/brin/t/02_wal_consistency.pl b/src/test/modules/brin/t/02_wal_consistency.pl
index 8b2b244feb..288bbd7a60 100644
--- a/src/test/modules/brin/t/02_wal_consistency.pl
+++ b/src/test/modules/brin/t/02_wal_consistency.pl
@@ -64,7 +64,7 @@ my $end_lsn = $whiskey->lsn('flush');
 
 my ($ret, $out, $err) = $whiskey->psql(
 	'postgres', qq{
-	select count(*) from pg_get_wal_records_info('$start_lsn', '$end_lsn')
+	select count(*) from pg_get_wal_records_content('$start_lsn', '$end_lsn')
 	where resource_manager = 'BRIN' AND
 	record_type ILIKE '%revmap%'
 	});
diff --git a/src/test/modules/test_custom_rmgrs/t/001_basic.pl b/src/test/modules/test_custom_rmgrs/t/001_basic.pl
index 50655d3788..4000de0560 100644
--- a/src/test/modules/test_custom_rmgrs/t/001_basic.pl
+++ b/src/test/modules/test_custom_rmgrs/t/001_basic.pl
@@ -54,7 +54,7 @@ my $expected =
   qq($record_end_lsn|test_custom_rmgrs|TEST_CUSTOM_RMGRS_MESSAGE|0|payload (10 bytes): payload123);
 my $result = $node->safe_psql(
 	'postgres',
-	qq[SELECT end_lsn, resource_manager, record_type, fpi_length, description FROM pg_get_wal_records_info('$start_lsn', '$end_lsn')
+	qq[SELECT end_lsn, resource_manager, record_type, fpi_length, description FROM pg_get_wal_records_content('$start_lsn', '$end_lsn')
 		WHERE resource_manager = 'test_custom_rmgrs';]);
 is($result, $expected,
 	'custom WAL resource manager has successfully written a WAL record');
diff --git a/src/test/regress/expected/misc_functions.out b/src/test/regress/expected/misc_functions.out
index c669948370..e5e10cddf9 100644
--- a/src/test/regress/expected/misc_functions.out
+++ b/src/test/regress/expected/misc_functions.out
@@ -642,3 +642,82 @@ SELECT segment_number > 0 AS ok_segment_number, timeline_id
  t                 |  4294967295
 (1 row)
 
+-- pg_get_wal_records_content
+CREATE TABLE sample_tbl(col1 int, col2 int);
+SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
+INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
+SELECT pg_current_wal_lsn() AS wal_lsn2 \gset
+-- Mask DETAIL messages as these could refer to current LSN positions.
+\set VERBOSITY terse
+-- Invalid start LSN.
+SELECT * FROM pg_get_wal_records_content('0/0', :'wal_lsn1');
+ERROR:  could not read WAL at LSN 0/0
+-- Start LSN > End LSN.
+SELECT * FROM pg_get_wal_records_content(:'wal_lsn2', :'wal_lsn1');
+ERROR:  WAL start LSN must be less than end LSN
+-- Success with end LSNs.
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
+ ok 
+----
+ t
+(1 row)
+
+-- Failures with start LSNs.
+SELECT * FROM pg_get_wal_records_content('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
+ERROR:  WAL start LSN must be less than current LSN
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2');
+ ok 
+----
+ t
+(1 row)
+
+-- Test for filtering out WAL records of a particular table
+SELECT oid AS sample_tbl_oid FROM pg_class WHERE relname = 'sample_tbl' \gset
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2')
+			WHERE block_ref LIKE concat('%', :'sample_tbl_oid', '%') AND resource_manager = 'Heap';
+ ok 
+----
+ t
+(1 row)
+
+-- Test for filtering out WAL records based on resource_manager and
+-- record_type
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2')
+			WHERE resource_manager = 'Heap' AND record_type = 'INSERT';
+ ok 
+----
+ t
+(1 row)
+
+\set VERBOSITY default
+-- Tests for permissions
+CREATE ROLE regress_pg_get_wal;
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- no
+ has_function_privilege 
+------------------------
+ f
+(1 row)
+
+-- Functions accessible by users with role pg_read_server_files.
+GRANT pg_read_server_files TO regress_pg_get_wal;
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- yes
+ has_function_privilege 
+------------------------
+ t
+(1 row)
+
+-- Superuser can grant execute to other users.
+GRANT EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn)
+  TO regress_pg_get_wal;
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- yes
+ has_function_privilege 
+------------------------
+ t
+(1 row)
+
+REVOKE EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn)
+  FROM regress_pg_get_wal;
+DROP ROLE regress_pg_get_wal;
diff --git a/src/test/regress/sql/misc_functions.sql b/src/test/regress/sql/misc_functions.sql
index b57f01f3e9..40cee85dbc 100644
--- a/src/test/regress/sql/misc_functions.sql
+++ b/src/test/regress/sql/misc_functions.sql
@@ -237,3 +237,60 @@ SELECT segment_number > 0 AS ok_segment_number, timeline_id
   FROM pg_split_walfile_name('000000010000000100000000');
 SELECT segment_number > 0 AS ok_segment_number, timeline_id
   FROM pg_split_walfile_name('ffffffFF00000001000000af');
+
+-- pg_get_wal_records_content
+CREATE TABLE sample_tbl(col1 int, col2 int);
+SELECT pg_current_wal_lsn() AS wal_lsn1 \gset
+INSERT INTO sample_tbl SELECT * FROM generate_series(1, 2);
+SELECT pg_current_wal_lsn() AS wal_lsn2 \gset
+
+-- Mask DETAIL messages as these could refer to current LSN positions.
+\set VERBOSITY terse
+
+-- Invalid start LSN.
+SELECT * FROM pg_get_wal_records_content('0/0', :'wal_lsn1');
+-- Start LSN > End LSN.
+SELECT * FROM pg_get_wal_records_content(:'wal_lsn2', :'wal_lsn1');
+-- Success with end LSNs.
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', 'FFFFFFFF/FFFFFFFF');
+-- Failures with start LSNs.
+SELECT * FROM pg_get_wal_records_content('FFFFFFFF/FFFFFFFE', 'FFFFFFFF/FFFFFFFF');
+
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2');
+
+-- Test for filtering out WAL records of a particular table
+SELECT oid AS sample_tbl_oid FROM pg_class WHERE relname = 'sample_tbl' \gset
+
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2')
+			WHERE block_ref LIKE concat('%', :'sample_tbl_oid', '%') AND resource_manager = 'Heap';
+
+-- Test for filtering out WAL records based on resource_manager and
+-- record_type
+
+SELECT COUNT(*) >= 1 AS ok FROM pg_get_wal_records_content(:'wal_lsn1', :'wal_lsn2')
+			WHERE resource_manager = 'Heap' AND record_type = 'INSERT';
+
+\set VERBOSITY default
+
+-- Tests for permissions
+CREATE ROLE regress_pg_get_wal;
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- no
+
+-- Functions accessible by users with role pg_read_server_files.
+GRANT pg_read_server_files TO regress_pg_get_wal;
+
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- yes
+
+-- Superuser can grant execute to other users.
+GRANT EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn)
+  TO regress_pg_get_wal;
+
+SELECT has_function_privilege('regress_pg_get_wal',
+  'pg_get_wal_records_content(pg_lsn, pg_lsn) ', 'EXECUTE'); -- yes
+
+REVOKE EXECUTE ON FUNCTION pg_get_wal_records_content(pg_lsn, pg_lsn)
+  FROM regress_pg_get_wal;
+
+DROP ROLE regress_pg_get_wal;
-- 
2.34.1

Reply via email to