From b4d7674ca163da5bcf1d2ddb793955a43d8c8c6c Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouhaud@free.fr>
Date: Fri, 14 Apr 2023 13:49:09 +0800
Subject: [PATCH v6] Persist to disk logical slots during a shutdown checkpoint
 if the updated confirmed_flush_lsn has not yet been persisted.

It's entirely possible for a logical slot to have a confirmed_flush_lsn higher
than the last value saved on disk while not being marked as dirty.  It's
currently not a problem to lose that value during a clean shutdown / restart
cycle, but a later patch adding support for pg_upgrade of publications and
logical slots will rely on that value being properly persisted to disk.

Author: Julien Rouhaud
Reviewed-by: Wang Wei, Peter Smith, Masahiko Sawada
---
 src/backend/access/transam/xlog.c             |   2 +-
 src/backend/replication/slot.c                |  29 +++--
 src/include/replication/slot.h                |  13 ++-
 src/test/subscription/meson.build             |   1 +
 src/test/subscription/t/034_always_persist.pl | 100 ++++++++++++++++++
 5 files changed, 132 insertions(+), 13 deletions(-)
 create mode 100644 src/test/subscription/t/034_always_persist.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60c0b7ec3a..6dced61cf4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7026,7 +7026,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
-	CheckPointReplicationSlots();
+	CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 567d61540a..f732d85158 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -70,7 +70,8 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
-static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel,
+						   bool is_shutdown);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -282,6 +283,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -744,7 +746,7 @@ ReplicationSlotSave(void)
 	Assert(MyReplicationSlot != NULL);
 
 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
-	SaveSlotToPath(MyReplicationSlot, path, ERROR);
+	SaveSlotToPath(MyReplicationSlot, path, ERROR, false);
 }
 
 /*
@@ -1533,11 +1535,10 @@ restart:
 /*
  * Flush all replication slots to disk.
  *
- * This needn't actually be part of a checkpoint, but it's a convenient
- * location.
+ * is_shutdown is true in case of a shutdown checkpoint.
  */
 void
-CheckPointReplicationSlots(void)
+CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
 
@@ -1562,7 +1563,7 @@ CheckPointReplicationSlots(void)
 
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
-		SaveSlotToPath(s, path, LOG);
+		SaveSlotToPath(s, path, LOG, is_shutdown);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
 }
@@ -1668,7 +1669,7 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
 	/* Write the actual state file. */
 	slot->dirty = true;			/* signal that we really need to write */
-	SaveSlotToPath(slot, tmppath, ERROR);
+	SaveSlotToPath(slot, tmppath, ERROR, false);
 
 	/* Rename the directory into place. */
 	if (rename(tmppath, path) != 0)
@@ -1694,22 +1695,26 @@ CreateSlotOnDisk(ReplicationSlot *slot)
  * Shared functionality between saving and creating a replication slot.
  */
 static void
-SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
+SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel,
+			   bool is_shutdown)
 {
 	char		tmppath[MAXPGPATH];
 	char		path[MAXPGPATH];
 	int			fd;
 	ReplicationSlotOnDisk cp;
 	bool		was_dirty;
+	bool		flush_lsn_changed;
 
 	/* first check whether there's something to write out */
 	SpinLockAcquire(&slot->mutex);
 	was_dirty = slot->dirty;
 	slot->just_dirtied = false;
+	flush_lsn_changed = (slot->data.confirmed_flush != slot->last_saved_confirmed_flush);
 	SpinLockRelease(&slot->mutex);
 
-	/* and don't do anything if there's nothing to write */
-	if (!was_dirty)
+	/* Don't do anything if there's nothing to write. See ReplicationSlot. */
+	if (!was_dirty &&
+		!(is_shutdown && SlotIsLogical(slot) && flush_lsn_changed))
 		return;
 
 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
@@ -1834,11 +1839,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	/*
 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
-	 * already.
+	 * already and remember the confirmed_flush LSN value.
 	 */
 	SpinLockAcquire(&slot->mutex);
 	if (!slot->just_dirtied)
 		slot->dirty = false;
+	slot->last_saved_confirmed_flush = slot->data.confirmed_flush;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2035,6 +2041,7 @@ RestoreSlotFromDisk(const char *name)
 		/* initialize in memory state */
 		slot->effective_xmin = cp.slotdata.xmin;
 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
+		slot->last_saved_confirmed_flush =  cp.slotdata.confirmed_flush;
 
 		slot->candidate_catalog_xmin = InvalidTransactionId;
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a7d16a37a3..9146d6ede8 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -83,6 +83,17 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/*
+	 * We won't ensure that the slot is persisted after the confirmed_flush LSN
+	 * is updated as that could lead to frequent writes.  However, we need to
+	 * ensure that we do persist the slots at the time of shutdown whose
+	 * confirmed_flush LSN is changed since we last saved the slot to disk.
+	 * This will help in avoiding retreat of the confirmed_flush LSN after
+	 * restart.  This variable is used to track the last saved confirmed_flush
+	 * LSN value.
+	 */
+	XLogRecPtr	last_saved_confirmed_flush;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -146,7 +157,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
-extern void CheckPointReplicationSlots(void);
+extern void CheckPointReplicationSlots(bool is_shutdown);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index bd673a9d68..cdd2f8ba47 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/034_always_persist.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/034_always_persist.pl b/src/test/subscription/t/034_always_persist.pl
new file mode 100644
index 0000000000..7879c2d985
--- /dev/null
+++ b/src/test/subscription/t/034_always_persist.pl
@@ -0,0 +1,100 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test logical replication slots are always persisted to disk during a shutdown
+# checkpoint.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub compare_confirmed_flush
+{
+	my ($node, $confirmed_flush_from_log) = @_;
+
+	# Fetch Latest checkpoint location from the control file.
+	my ($stdout, $stderr) = run_command([ 'pg_controldata', $node->data_dir ]);
+	my @control_data = split("\n", $stdout);
+	my $latest_checkpoint = undef;
+	foreach (@control_data)
+	{
+		if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
+		{
+			$latest_checkpoint = $1;
+			last;
+		}
+	}
+	die "Latest checkpoint location not found in control file\n"
+	  unless defined($latest_checkpoint);
+
+	# Is it same as the value read from log?
+	ok($latest_checkpoint eq $confirmed_flush_from_log,
+		"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location");
+
+	return;
+}
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
+$node_publisher->init(allows_streaming => 'logical');
+# Avoid checkpoint during the test, otherwise, the latest checkpoint location
+# change.
+$node_publisher->append_conf('postgresql.conf', q{
+checkpoint_timeout = 1h
+});
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create table
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_tbl"
+);
+
+is($result, qq(5), "check initial copy was done");
+
+my $offset = -s $node_publisher->logfile;
+
+# Restart the publisher to ensure that the slot will be persisted if required.
+$node_publisher->restart();
+
+# Wait until the walsender creates decoding context
+$node_publisher->wait_for_log(
+	qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
+	$offset
+);
+
+# Extract confirmed_flush from the logfile
+my $log_contents = slurp_file($node_publisher->logfile, $offset);
+$log_contents =~
+	qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
+	or die "could not get confirmed_flush_lsn";
+
+# Ensure that the slot's confirmed_flush LSN is the same as the
+# latest_checkpoint location.
+compare_confirmed_flush($node_publisher, $1);
+
+done_testing();
-- 
2.28.0.windows.1

