> On 11/21/24 14:59, Tomas Vondra wrote: > > I don't have a great idea how to improve this. It seems wrong for > ReplicationSlotsComputeRequiredLSN() to calculate the LSN using values > from dirty slots, so maybe it should simply retry if any slot is dirty? > Or retry on that one slot? But various places update the restart_lsn > before marking the slot as dirty, so right now this won't work.
To ping the topic, I would like to propose a new version of my patch. All the check-world tests seems to pass ok. The idea of the patch is pretty simple - keep flushed restart_lsn in memory and use this value to calculate required lsn in ReplicationSlotsComputeRequiredLSN(). One note - if restart_lsn_flushed is invalid, the restart_lsn value will be used. If we take invalid restart_lsn_flushed instead of valid restart_lsn the slot will be skipped. At the moment I have no other ideas how to deal with invalid restart_lsn_flushed. With best regards, Vitaly
From a6fb33969213e5f5dd994853ac052df64372b85f Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Thu, 31 Oct 2024 12:29:12 +0300 Subject: [PATCH 1/2] Keep WAL segments by slot's flushed restart LSN --- src/backend/replication/slot.c | 37 +++++++++++++++++++++++++++++ src/backend/replication/walsender.c | 13 ++++++++++ src/include/replication/slot.h | 4 ++++ 3 files changed, 54 insertions(+) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 4a206f9527..c3c44fe9f8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->restart_lsn_flushed = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1142,20 +1143,34 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* truncate WAL for persistent slots by flushed restart_lsn */ + if (persistency == RS_PERSISTENT) + { + if (restart_lsn_flushed != InvalidXLogRecPtr && + restart_lsn > restart_lsn_flushed) + { + restart_lsn = restart_lsn_flushed; + } + } + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1193,7 +1208,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1207,14 +1224,26 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* truncate WAL for persistent slots by flushed restart_lsn */ + if (persistency == RS_PERSISTENT) + { + if (restart_lsn_flushed != InvalidXLogRecPtr && + restart_lsn > restart_lsn_flushed) + { + restart_lsn = restart_lsn_flushed; + } + } + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1432,6 +1461,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->restart_lsn_flushed == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1607,6 +1637,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, */ SpinLockAcquire(&s->mutex); + Assert(s->data.restart_lsn >= s->restart_lsn_flushed); + restart_lsn = s->data.restart_lsn; /* we do nothing if the slot is already invalid */ @@ -1691,7 +1723,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->restart_lsn_flushed = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2189,6 +2224,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2386,6 +2422,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 371eef3ddd..03cdce23f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2329,6 +2329,7 @@ static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; + XLogRecPtr restart_lsn_flushed; ReplicationSlot *slot = MyReplicationSlot; Assert(lsn != InvalidXLogRecPtr); @@ -2336,6 +2337,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (slot->data.restart_lsn != lsn) { changed = true; + restart_lsn_flushed = slot->restart_lsn_flushed; slot->data.restart_lsn = lsn; } SpinLockRelease(&slot->mutex); @@ -2343,6 +2345,17 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (changed) { ReplicationSlotMarkDirty(); + + /* Save the replication slot on disk in case of its flushed restart_lsn + * is invalid. Slots with invalid restart lsn are ignored when + * calculating required LSN. Once we started to keep the WAL by flushed + * restart LSN, we should save to disk an initial valid value. + */ + if (slot->data.persistency == RS_PERSISTENT) { + if (restart_lsn_flushed == InvalidXLogRecPtr && lsn != InvalidXLogRecPtr) + ReplicationSlotSave(); + } + ReplicationSlotsComputeRequiredLSN(); PhysicalWakeupLogicalWalSnd(); } diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index d2cf786fd5..e66248b82d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -211,6 +211,10 @@ typedef struct ReplicationSlot * recently stopped. */ TimestampTz inactive_since; + + /* Latest restart LSN that was flushed to disk */ + XLogRecPtr restart_lsn_flushed; + } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) -- 2.34.1
From f950bb109563ce407a5972abbd2fc931ef18dbeb Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Fri, 13 Dec 2024 16:02:14 +0300 Subject: [PATCH 2/2] Fix src/recovery/t/001_stream_rep.pl after changes in restart lsn --- src/test/recovery/t/001_stream_rep.pl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index f3ea45ac4a..95f61cabfc 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -553,6 +553,9 @@ chomp($phys_restart_lsn_post); ok( ($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0, "physical slot advance persists across restarts"); +# Cleanup unused WAL segments +$node_primary->safe_psql('postgres', "CHECKPOINT;"); + # Check if the previous segment gets correctly recycled after the # server stopped cleanly, causing a shutdown checkpoint to be generated. my $primary_data = $node_primary->data_dir; -- 2.34.1