> On 11/21/24 14:59, Tomas Vondra wrote:
>
> I don't have a great idea how to improve this. It seems wrong for
> ReplicationSlotsComputeRequiredLSN() to calculate the LSN using values
> from dirty slots, so maybe it should simply retry if any slot is dirty?
> Or retry on that one slot? But various places update the restart_lsn
> before marking the slot as dirty, so right now this won't work.

To ping the topic, I would like to propose a new version of my patch. All the 
check-world tests seems to pass ok. 

The idea of the patch is pretty simple - keep flushed restart_lsn in memory and 
use this value to calculate required lsn in 
ReplicationSlotsComputeRequiredLSN().

One note - if restart_lsn_flushed is invalid, the restart_lsn value will be 
used. If we take invalid restart_lsn_flushed instead of valid restart_lsn the 
slot will be skipped. At the moment I have no other ideas how to deal with 
invalid restart_lsn_flushed.

With best regards,
Vitaly
From a6fb33969213e5f5dd994853ac052df64372b85f Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davy...@postgrespro.ru>
Date: Thu, 31 Oct 2024 12:29:12 +0300
Subject: [PATCH 1/2] Keep WAL segments by slot's flushed restart LSN

---
 src/backend/replication/slot.c      | 37 +++++++++++++++++++++++++++++
 src/backend/replication/walsender.c | 13 ++++++++++
 src/include/replication/slot.h      |  4 ++++
 3 files changed, 54 insertions(+)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4a206f9527..c3c44fe9f8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
 	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+	slot->restart_lsn_flushed = InvalidXLogRecPtr;
 	slot->inactive_since = 0;
 
 	/*
@@ -1142,20 +1143,34 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		XLogRecPtr	restart_lsn_flushed;
 		bool		invalidated;
+		ReplicationSlotPersistency persistency;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
+		persistency = s->data.persistency;
 		restart_lsn = s->data.restart_lsn;
 		invalidated = s->data.invalidated != RS_INVAL_NONE;
+		restart_lsn_flushed = s->restart_lsn_flushed;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
 		if (invalidated)
 			continue;
 
+		/* truncate WAL for persistent slots by flushed restart_lsn */
+		if (persistency == RS_PERSISTENT)
+		{
+			if (restart_lsn_flushed != InvalidXLogRecPtr &&
+				restart_lsn > restart_lsn_flushed)
+			{
+				restart_lsn = restart_lsn_flushed;
+			}
+		}
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -1193,7 +1208,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 	{
 		ReplicationSlot *s;
 		XLogRecPtr	restart_lsn;
+		XLogRecPtr	restart_lsn_flushed;
 		bool		invalidated;
+		ReplicationSlotPersistency persistency;
 
 		s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1207,14 +1224,26 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 
 		/* read once, it's ok if it increases while we're checking */
 		SpinLockAcquire(&s->mutex);
+		persistency = s->data.persistency;
 		restart_lsn = s->data.restart_lsn;
 		invalidated = s->data.invalidated != RS_INVAL_NONE;
+		restart_lsn_flushed = s->restart_lsn_flushed;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
 		if (invalidated)
 			continue;
 
+		/* truncate WAL for persistent slots by flushed restart_lsn */
+		if (persistency == RS_PERSISTENT)
+		{
+			if (restart_lsn_flushed != InvalidXLogRecPtr &&
+				restart_lsn > restart_lsn_flushed)
+			{
+				restart_lsn = restart_lsn_flushed;
+			}
+		}
+
 		if (restart_lsn == InvalidXLogRecPtr)
 			continue;
 
@@ -1432,6 +1461,7 @@ ReplicationSlotReserveWal(void)
 
 	Assert(slot != NULL);
 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+	Assert(slot->restart_lsn_flushed == InvalidXLogRecPtr);
 
 	/*
 	 * The replication slot mechanism is used to prevent removal of required
@@ -1607,6 +1637,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		 */
 		SpinLockAcquire(&s->mutex);
 
+		Assert(s->data.restart_lsn >= s->restart_lsn_flushed);
+
 		restart_lsn = s->data.restart_lsn;
 
 		/* we do nothing if the slot is already invalid */
@@ -1691,7 +1723,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			 * just rely on .invalidated.
 			 */
 			if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+			{
 				s->data.restart_lsn = InvalidXLogRecPtr;
+				s->restart_lsn_flushed = InvalidXLogRecPtr;
+			}
 
 			/* Let caller know */
 			*invalidated = true;
@@ -2189,6 +2224,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	if (!slot->just_dirtied)
 		slot->dirty = false;
 	slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+	slot->restart_lsn_flushed = cp.slotdata.restart_lsn;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2386,6 +2422,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->effective_xmin = cp.slotdata.xmin;
 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
 		slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+		slot->restart_lsn_flushed = cp.slotdata.restart_lsn;
 
 		slot->candidate_catalog_xmin = InvalidTransactionId;
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 371eef3ddd..03cdce23f0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2329,6 +2329,7 @@ static void
 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	bool		changed = false;
+	XLogRecPtr	restart_lsn_flushed;
 	ReplicationSlot *slot = MyReplicationSlot;
 
 	Assert(lsn != InvalidXLogRecPtr);
@@ -2336,6 +2337,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (slot->data.restart_lsn != lsn)
 	{
 		changed = true;
+		restart_lsn_flushed = slot->restart_lsn_flushed;
 		slot->data.restart_lsn = lsn;
 	}
 	SpinLockRelease(&slot->mutex);
@@ -2343,6 +2345,17 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
+
+		/* Save the replication slot on disk in case of its flushed restart_lsn
+		 * is invalid. Slots with invalid restart lsn are ignored when
+		 * calculating required LSN. Once we started to keep the WAL by flushed
+		 * restart LSN, we should save to disk an initial valid value.
+		 */
+		if (slot->data.persistency == RS_PERSISTENT) {
+			if (restart_lsn_flushed == InvalidXLogRecPtr && lsn != InvalidXLogRecPtr)
+				ReplicationSlotSave();
+		}
+
 		ReplicationSlotsComputeRequiredLSN();
 		PhysicalWakeupLogicalWalSnd();
 	}
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index d2cf786fd5..e66248b82d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -211,6 +211,10 @@ typedef struct ReplicationSlot
 	 * recently stopped.
 	 */
 	TimestampTz inactive_since;
+
+	/* Latest restart LSN that was flushed to disk */
+	XLogRecPtr restart_lsn_flushed;
+
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
-- 
2.34.1

From f950bb109563ce407a5972abbd2fc931ef18dbeb Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davy...@postgrespro.ru>
Date: Fri, 13 Dec 2024 16:02:14 +0300
Subject: [PATCH 2/2] Fix src/recovery/t/001_stream_rep.pl after changes in
 restart lsn

---
 src/test/recovery/t/001_stream_rep.pl | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index f3ea45ac4a..95f61cabfc 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -553,6 +553,9 @@ chomp($phys_restart_lsn_post);
 ok( ($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0,
 	"physical slot advance persists across restarts");
 
+# Cleanup unused WAL segments
+$node_primary->safe_psql('postgres', "CHECKPOINT;");
+
 # Check if the previous segment gets correctly recycled after the
 # server stopped cleanly, causing a shutdown checkpoint to be generated.
 my $primary_data = $node_primary->data_dir;
-- 
2.34.1

Reply via email to